diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index ef7229c..7f70685 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -67,6 +67,12 @@ public Resource divideAndCeil(Resource numerator, int denominator) {
}
@Override
+ public Resource divideAndCeil(Resource numerator, float denominator) {
+ return Resources.createResource(
+ divideAndCeil(numerator.getMemorySize(), denominator));
+ }
+
+ @Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) {
if (stepFactor.getMemorySize() == 0) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 032aa02..18551d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -155,6 +155,14 @@ public Resource divideAndCeil(Resource numerator, int denominator) {
}
@Override
+ public Resource divideAndCeil(Resource numerator, float denominator) {
+ return Resources.createResource(
+ divideAndCeil(numerator.getMemorySize(), denominator),
+ divideAndCeil(numerator.getVirtualCores(), denominator)
+ );
+ }
+
+ @Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) {
if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index a2f85b3..12a0a16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -65,6 +65,13 @@ public static int divideAndCeil(int a, int b) {
}
return (a + (b - 1)) / b;
}
+
+ public static int divideAndCeil(int a, float b) {
+ if (b == 0) {
+ return 0;
+ }
+ return Math.round((a + (b - 1)) / b);
+ }
public static long divideAndCeil(long a, long b) {
if (b == 0) {
@@ -73,6 +80,13 @@ public static long divideAndCeil(long a, long b) {
return (a + (b - 1)) / b;
}
+ public static long divideAndCeil(long a, float b) {
+ if (b == 0) {
+ return 0;
+ }
+ return Math.round((a + (b - 1)) / b);
+ }
+
public static int roundUp(int a, int b) {
return divideAndCeil(a, b) * b;
}
@@ -198,6 +212,15 @@ public abstract float divide(
* @return resultant resource
*/
public abstract Resource divideAndCeil(Resource numerator, int denominator);
+
+ /**
+ * Divide-and-ceil numerator by denominator.
+ *
+ * @param numerator numerator resource
+ * @param denominator denominator
+ * @return resultant resource
+ */
+ public abstract Resource divideAndCeil(Resource numerator, float denominator);
/**
* Check if a smaller resource can be contained by bigger resource.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 7020300..291657c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -286,6 +286,11 @@ public static Resource divideAndCeil(
ResourceCalculator resourceCalculator, Resource lhs, int rhs) {
return resourceCalculator.divideAndCeil(lhs, rhs);
}
+
+ public static Resource divideAndCeil(
+ ResourceCalculator resourceCalculator, Resource lhs, float rhs) {
+ return resourceCalculator.divideAndCeil(lhs, rhs);
+ }
public static boolean equals(Resource lhs, Resource rhs) {
return lhs.equals(rhs);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 1643390..6ce0df4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -110,6 +111,7 @@
protected ReentrantReadWriteLock.WriteLock writeLock;
volatile Priority priority = Priority.newInstance(0);
+ private Map userWeights = new HashMap();
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -331,11 +333,43 @@ void setupQueueConfigs(Resource clusterResource)
this.priority = csContext.getConfiguration().getQueuePriority(
getQueuePath());
+
+ this.userWeights = constructUserWeights();
} finally {
writeLock.unlock();
}
}
+ private Map constructUserWeights() throws IOException {
+ Map unionInheritedWeights = new HashMap();
+ CSQueue parentQ = getParent();
+ if (parentQ != null) {
+ // Inherit all of parent's user's weights
+ unionInheritedWeights.putAll(parentQ.getUserWeights());
+ }
+ // Insert this queue's user's weights, overriding parent's user's weights if
+ // there is overlap.
+ CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
+ Map w = csConf.getAllUserWeightForQueue(getQueuePath());
+ unionInheritedWeights.putAll(w);
+
+ if (this instanceof LeafQueue) {
+ // Validate leaf queue's user's weights.
+ int queueUL = Math.min(100, csConf.getUserLimit(getQueuePath()));
+ for (Entry e : w.entrySet()) {
+ float val = e.getValue().floatValue();
+ if (val < 0.0f || val > 100.0f / queueUL) {
+ throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
+ + "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
+ 100.0f/queueUL + ", the number of concurrent active users in "
+ + getQueuePath() + ")");
+ }
+ }
+ }
+
+ return unionInheritedWeights;
+ }
+
private void initializeQueueState(QueueState previousState,
QueueState configuredState, QueueState parentState) {
// verify that we can not any value for State other than RUNNING/STOPPED
@@ -931,4 +965,9 @@ protected void appFinished() {
public Priority getPriority() {
return this.priority;
}
+
+ @Override
+ public Map getUserWeights() {
+ return userWeights;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index c6726ec..3a17d1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -350,4 +351,10 @@ public void validateSubmitApplication(ApplicationId applicationId,
* @return queue priority
*/
Priority getPriority();
+
+ /**
+ * Get a map of usernames and weights
+ * @return map of usernames and corresponding weight
+ */
+ Map getUserWeights();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 9fb92ec..2110157 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -108,6 +108,15 @@
public static final String USER_LIMIT_FACTOR = "user-limit-factor";
@Private
+ public static final String USER_WEIGHT = "weight";
+
+ @Private
+ public static final String USER_SETTINGS = "user-settings";
+
+ @Private
+ public static final float DEFAULT_USER_WEIGHT = 1.0f;
+
+ @Private
public static final String STATE = "state";
@Private
@@ -1404,4 +1413,29 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
}
+
+ /**
+ * Get the configured weights of all users at this queue level.
+ * Used in computing user-specific user limit, relative to other users.
+ * @param queuePath full queue path
+ * @return map of user weights, if they exists. Otherwise, return empty map.
+ */
+ public Map getAllUserWeightForQueue(String queuePath) {
+ Map userWeights = new HashMap ();
+ String qPathPlusPrefix =
+ getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.")
+ + USER_SETTINGS + "\\.";
+ String weightKeyRegex =
+ qPathPlusPrefix + "\\w+\\." + USER_WEIGHT;
+ Map props = getValByRegex(weightKeyRegex);
+ for (Entry e : props.entrySet()) {
+ String userName =
+ e.getKey().replaceFirst(qPathPlusPrefix, "")
+ .replaceFirst("\\." + USER_WEIGHT, "");
+ if (userName != null && !userName.isEmpty()) {
+ userWeights.put(userName, new Float(e.getValue()));
+ }
+ }
+ return userWeights;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index be6243d..b1cf5a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -157,6 +157,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf)
setQueueAcls(authorizer, appPriorityACLManager, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
+ updateUserWeights(queues);
LOG.info("Initialized root queue " + root);
}
@@ -186,6 +187,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
+ updateUserWeights(queues);
}
/**
@@ -440,4 +442,17 @@ public Priority getDefaultPriorityForQueue(String queueName) {
getQueueStateManager() {
return this.queueStateManager;
}
+
+ /**
+ * Update weights for users currently in leaf queues.
+ * @param queues all queues in the cluster
+ */
+ private void updateUserWeights(Map queues) {
+ for (CSQueue queue : queues.values()) {
+ if (queue instanceof LeafQueue) {
+ LeafQueue lQueue = (LeafQueue) queue;
+ lQueue.getUsersManager().updateUserWeights();
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9059ef0..c3e12273 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -622,11 +622,16 @@ public Resource calculateAndGetAMResourceLimit() {
@VisibleForTesting
public Resource getUserAMResourceLimit() {
- return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
+ return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
+ null);
}
public Resource getUserAMResourceLimitPerPartition(
- String nodePartition) {
+ String nodePartition, String userName) {
+ float userWeight = 1.0f;
+ if (userName != null && getUser(userName) != null) {
+ userWeight = getUser(userName).getWeight();
+ }
try {
readLock.lock();
/*
@@ -637,6 +642,7 @@ public Resource getUserAMResourceLimitPerPartition(
*/
float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
+ effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
Resource queuePartitionResource = Resources
.multiplyAndNormalizeUp(resourceCalculator,
@@ -777,7 +783,8 @@ private void activateApplications() {
// Verify whether we already calculated user-am-limit for this label.
if (userAMLimit == null) {
- userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
+ userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
+ application.getUser());
userAmPartitionLimit.put(partitionName, userAMLimit);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java
index ff9d304..a1a8ecf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java
@@ -37,11 +37,14 @@
protected ResourceInfo AMResourceUsed;
protected ResourceInfo userResourceLimit;
protected ResourcesInfo resources;
+ private float userWeight;
+ private boolean isActive;
UserInfo() {}
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
- Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) {
+ Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage,
+ float weight, boolean isActive) {
this.username = username;
this.resourcesUsed = new ResourceInfo(resUsed);
this.numActiveApplications = activeApps;
@@ -49,6 +52,8 @@
this.AMResourceUsed = new ResourceInfo(amResUsed);
this.userResourceLimit = new ResourceInfo(resourceLimit);
this.resources = new ResourcesInfo(resourceUsage);
+ this.userWeight = weight;
+ this.isActive = isActive;
}
public String getUsername() {
@@ -78,4 +83,12 @@ public ResourceInfo getUserResourceLimit() {
public ResourcesInfo getResourceUsageInfo() {
return resources;
}
+
+ public float getUserWeight() {
+ return userWeight;
+ }
+
+ public boolean getIsActive() {
+ return isActive;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index c2134eb..92e602a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -92,6 +92,9 @@
Map> preComputedActiveUserLimit = new ConcurrentHashMap<>();
Map> preComputedAllUserLimit = new ConcurrentHashMap<>();
+ private float activeUsersTimesWeights = 0.0f;
+ private float allUsersTimesWeights = 0.0f;
+
/**
* UsageRatios will store the total used resources ratio across all users of
* the queue.
@@ -158,6 +161,7 @@ private void setUsageRatio(String label, float ratio) {
private UsageRatios userUsageRatios = new UsageRatios();
private WriteLock writeLock;
+ private float weight;
public User(String name) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -253,6 +257,20 @@ public Resource getUserResourceLimit() {
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
+
+ /**
+ * @return the weight
+ */
+ public float getWeight() {
+ return weight;
+ }
+
+ /**
+ * @param weight the weight to set
+ */
+ public void setWeight(float weight) {
+ this.weight = weight;
+ }
} /* End of User class */
/**
@@ -373,6 +391,8 @@ public void removeUser(String userName) {
// Remove user from active/non-active list as well.
activeUsersSet.remove(userName);
nonActiveUsersSet.remove(userName);
+ activeUsersTimesWeights = countActiveUsersTimesWeights();
+ allUsersTimesWeights = countAllUsersTimesWeights();
} finally {
writeLock.unlock();
}
@@ -409,6 +429,8 @@ public User getUserAndAddIfAbsent(String userName) {
*/
private void addUser(String userName, User user) {
this.users.put(userName, user);
+ user.setWeight(getUserWeightFromQueue(userName));
+ allUsersTimesWeights = countAllUsersTimesWeights();
}
/**
@@ -425,7 +447,8 @@ private void addUser(String userName, User user) {
user.getActiveApplications(), user.getPendingApplications(),
Resources.clone(user.getConsumedAMResources()),
Resources.clone(user.getUserResourceLimit()),
- user.getResourceUsage()));
+ user.getResourceUsage(), user.getWeight(),
+ activeUsersSet.contains(user.userName)));
}
return usersToReturn;
} finally {
@@ -433,6 +456,11 @@ private void addUser(String userName, User user) {
}
}
+ private float getUserWeightFromQueue(String userName) {
+ Float weight = lQueue.getUserWeights().get(userName);
+ return (weight == null) ? 1.0f : weight.floatValue();
+ }
+
/**
* Get computed user-limit for all ACTIVE users in this queue. If cached data
* is invalidated due to resource change, this method also enforce to
@@ -471,13 +499,24 @@ public Resource getComputedResourceLimitForActiveUsers(String userName,
writeLock.unlock();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("userLimit is fetched. userLimit = "
- + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
- + schedulingMode + ", partition=" + nodePartition);
+ Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
+ User user = getUser(userName);
+ float weight = (user == null) ? 1.0f : user.getWeight();
+ Resource userSpecificUserLimit =
+ Resources.multiplyAndNormalizeUp(resourceCalculator,
+ userLimitResource, weight, lQueue.getMinimumAllocation());
+
+ if (activeUsersSet.contains(userName)) {
+ getUser(userName).setUserResourceLimit(userSpecificUserLimit);
}
- return userLimitPerSchedulingMode.get(schedulingMode);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
+ + ", userSpecificUserLimit=" + userSpecificUserLimit
+ + ", schedulingMode=" + schedulingMode
+ + ", partition=" + nodePartition);
+ }
+ return userSpecificUserLimit;
}
/**
@@ -518,13 +557,20 @@ public Resource getComputedResourceLimitForAllUsers(String userName,
writeLock.unlock();
}
+ Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
+ float weight = getUser(userName).getWeight();
+ Resource userSpecificUserLimit =
+ Resources.multiplyAndNormalizeUp(resourceCalculator,
+ userLimitResource, weight, lQueue.getMinimumAllocation());
+
if (LOG.isDebugEnabled()) {
- LOG.debug("userLimit is fetched. userLimit = "
- + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
- + schedulingMode + ", partition=" + nodePartition);
+ LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
+ + ", userSpecificUserLimit=" + userSpecificUserLimit
+ + ", schedulingMode=" + schedulingMode
+ + ", partition=" + nodePartition);
}
- return userLimitPerSchedulingMode.get(schedulingMode);
+ return userSpecificUserLimit;
}
/*
@@ -647,16 +693,19 @@ private Resource computeUserLimit(String userName, Resource clusterResource,
queueCapacity, required);
/*
- * We want to base the userLimit calculation on max(queueCapacity,
- * usedResources+required). However, we want usedResources to be based on
- * the combined ratios of all the users in the queue so we use consumedRatio
- * to calculate such. The calculation is dependent on how the
- * resourceCalculator calculates the ratio between two Resources. DRF
- * Example: If usedResources is greater than queueCapacity and users have
- * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
- * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
- * is then 20+30=50%. Yes, this value can be larger than 100% but for the
- * purposes of making sure all users are getting their fair share, it works.
+ * We want to base the userLimit calculation on
+ * max(queueCapacity, usedResources+required). However, we want
+ * usedResources to be based on the combined ratios of all the users in the
+ * queue so we use consumedRatio to calculate such.
+ * The calculation is dependent on how the resourceCalculator calculates the
+ * ratio between two Resources. DRF Example: If usedResources is greater
+ * than queueCapacity and users have the following [mem,cpu] usages:
+ *
+ * User1: [10%,20%] - Dominant resource is 20%
+ * User2: [30%,10%] - Dominant resource is 30%
+ * Then total consumedRatio is then 20+30=50%. Yes, this value can be
+ * larger than 100% but for the purposes of making sure all users are
+ * getting their fair share, it works.
*/
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource, getUsageRatio(nodePartition),
@@ -671,23 +720,23 @@ private Resource computeUserLimit(String userName, Resource clusterResource,
* capacity * user-limit-factor. Also, the queue's configured capacity
* should be higher than queue-hard-limit * ulMin
*/
- int usersCount = getNumActiveUsers();
+ float usersCountByWeight = activeUsersTimesWeights;
Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
// For non-activeUser calculation, consider all users count.
if (!activeUser) {
resourceUsed = currentCapacity;
- usersCount = users.size();
+ usersCountByWeight = allUsersTimesWeights;
}
/*
- * User limit resource is determined by: max{currentCapacity / #activeUsers,
+ * User limit resource is determined by: max(currentCapacity / #activeUsers,
* currentCapacity * user-limit-percentage%)
*/
Resource userLimitResource = Resources.max(resourceCalculator,
partitionResource,
Resources.divideAndCeil(resourceCalculator, resourceUsed,
- usersCount),
+ usersCountByWeight),
Resources.divideAndCeil(resourceCalculator,
Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
100));
@@ -718,18 +767,26 @@ private Resource computeUserLimit(String userName, Resource clusterResource,
lQueue.getMinimumAllocation());
if (LOG.isDebugEnabled()) {
- LOG.debug("User limit computation for " + userName + " in queue "
- + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
- + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
- + required + " consumed: " + consumed + " user-limit-resource: "
- + userLimitResource + " queueCapacity: " + queueCapacity
- + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
- + " currentCapacity: " + currentCapacity + " activeUsers: "
- + usersCount + " clusterCapacity: " + clusterResource
- + " resourceByLabel: " + partitionResource + " usageratio: "
- + getUsageRatio(nodePartition) + " Partition: " + nodePartition);
- }
- getUser(userName).setUserResourceLimit(userLimitResource);
+ LOG.debug("User limit computation for " + userName
+ + ", in queue: " + lQueue.getQueueName()
+ + ", userLimitPercent=" + lQueue.getUserLimit()
+ + ", userLimitFactor=" + lQueue.getUserLimitFactor()
+ + ", required=" + required
+ + ", consumed=" + consumed
+ + ", user-limit-resource=" + userLimitResource
+ + ", queueCapacity=" + queueCapacity
+ + ", qconsumed=" + lQueue.getQueueResourceUsage().getUsed()
+ + ", currentCapacity=" + currentCapacity
+ + ", activeUsers=" + usersCountByWeight
+ + ", clusterCapacity=" + clusterResource
+ + ", resourceByLabel=" + partitionResource
+ + ", usageratio=" + getUsageRatio(nodePartition)
+ + ", Partition=" + nodePartition
+ + ", resourceUsed=" + resourceUsed
+ + ", maxUserLimit=" + maxUserLimit
+ + ", userWeight=" + getUser(userName).getWeight()
+ );
+ }
return userLimitResource;
}
@@ -829,6 +886,32 @@ public int getNumActiveUsers() {
return activeUsers.get();
}
+ float countActiveUsersTimesWeights() {
+ float count = 0.0f;
+ try {
+ this.readLock.lock();
+ for (String u : activeUsersSet) {
+ count += getUser(u).getWeight();
+ }
+ return count;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ float countAllUsersTimesWeights() {
+ float count = 0.0f;
+ try {
+ this.readLock.lock();
+ for (String u : users.keySet()) {
+ count += getUser(u).getWeight();
+ }
+ return count;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
private void updateActiveUsersResourceUsage(String userName) {
try {
this.writeLock.lock();
@@ -841,6 +924,7 @@ private void updateActiveUsersResourceUsage(String userName) {
if (nonActiveUsersSet.contains(userName)) {
nonActiveUsersSet.remove(userName);
activeUsersSet.add(userName);
+ activeUsersTimesWeights = countActiveUsersTimesWeights();
// Update total resource usage of active and non-active after user
// is moved from non-active to active.
@@ -881,6 +965,7 @@ private void updateNonActiveUsersResourceUsage(String userName) {
if (activeUsersSet.contains(userName)) {
activeUsersSet.remove(userName);
nonActiveUsersSet.add(userName);
+ activeUsersTimesWeights = countActiveUsersTimesWeights();
// Update total resource usage of active and non-active after user is
// moved from active to non-active.
@@ -981,4 +1066,16 @@ private void updateResourceUsagePerUser(User user, Resource resource,
+ totalResUsageForNonActiveUsers.getAllUsed());
}
}
+
+ public void updateUserWeights() {
+ try {
+ this.writeLock.lock();
+ for (Map.Entry ue : users.entrySet()) {
+ ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
+ }
+ userLimitNeedsRecompute();
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 5c0b718..79299a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -882,8 +882,8 @@ protected void getPendingAppDiagnosticMessage(
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
diagnosticMessage.append("; ");
diagnosticMessage.append("User AM Resource Limit of the queue = ");
- diagnosticMessage.append(
- queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName));
+ diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition(
+ appAMNodePartitionName, getUser()));
diagnosticMessage.append("; ");
diagnosticMessage.append("Queue AM Resource Usage = ");
diagnosticMessage.append(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
index b972428..292c5f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
@@ -68,6 +68,7 @@
"left:0%;background:none;border:1px dashed #BFBFBF";
static final String Q_OVER = "background:#FFA333";
static final String Q_UNDER = "background:#5BD75B";
+ static final String ACTIVE_USER = "background:#FFFF00"; // Yellow highlight
@RequestScoped
static class CSQInfo {
@@ -209,6 +210,7 @@ protected void render(Block html) {
html.table("#userinfo").thead().$class("ui-widget-header").tr().th()
.$class("ui-state-default")._("User Name")._().th()
.$class("ui-state-default")._("Max Resource")._().th()
+ .$class("ui-state-default")._("Weight")._().th()
.$class("ui-state-default")._("Used Resource")._().th()
.$class("ui-state-default")._("Max AM Resource")._().th()
.$class("ui-state-default")._("Used AM Resource")._().th()
@@ -229,8 +231,11 @@ protected void render(Block html) {
ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
? new ResourceInfo(Resources.none())
: resourceUsages.getAmUsed();
- tbody.tr().td(userInfo.getUsername())
+ String highlightIfAsking =
+ userInfo.getIsActive() ? ACTIVE_USER : null;
+ tbody.tr().$style(highlightIfAsking).td(userInfo.getUsername())
.td(userInfo.getUserResourceLimit().toString())
+ .td(String.valueOf(userInfo.getUserWeight()))
.td(resourcesUsed.toString())
.td(resourceUsages.getAMLimit().toString())
.td(amUsed.toString())
@@ -399,6 +404,8 @@ public void render(Block html) {
_("Used (over capacity)")._().
span().$class("qlegend ui-corner-all ui-state-default").
_("Max Capacity")._().
+ span().$class("qlegend ui-corner-all").$style(ACTIVE_USER).
+ _("Users Requesting Resources")._().
_();
float used = 0;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index b4ebd15..68dc147 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -475,7 +475,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
scheduler.moveApplication(app1.getApplicationId(), "a4");
// Check move to queue with accessible label ANY
scheduler.moveApplication(app1.getApplicationId(), "b");
- } catch (Exception e) {
+ } catch (Exception e) { e.printStackTrace(System.out);
fail("Should not throw exception since target queue has "
+ "required labels");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index a9ed5a9..4417132 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -955,7 +955,130 @@ public void testUserLimits() throws Exception {
// app_0 doesn't have outstanding resources, there's only one active user.
assertEquals("There should only be 1 active user!",
1, a.getAbstractUsersManager().getNumActiveUsers());
+ }
+
+ @Test
+ public void testUserSpecificUserLimits() throws Exception {
+ // Mock the queue
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+ // Set minimum-user-limit-percent for queue "a" in the configs.
+ csConf.setUserLimit(a.getQueuePath(), 50);
+ // Set weight for "user_0" to be 1.5 for the a queue in the configs.
+ csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
+ + ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT,
+ 1.5f);
+
+ when(csContext.getClusterResource())
+ .thenReturn(Resources.createResource(16 * GB, 32));
+ // Verify that configs were updated and parsed correctly.
+ Assert.assertNull(a.getUserWeights().get("user_0"));
+ a.reinitialize(a, csContext.getClusterResource());
+ assertEquals(1.5, a.getUserWeights().get("user_0").floatValue(), 0.0);
+
+ // set maxCapacity
+ a.setMaxCapacity(1.0f);
+
+ // Set minimum user-limit-percent
+ a.setUserLimit(50);
+ a.setUserLimitFactor(2);
+
+ // Users
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+
+ // Set user_0's weight to 1.5 in the a queue's object.
+ a.getUsersManager().getUserAndAddIfAbsent(user_0).setWeight(1.5f);
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ a.getAbstractUsersManager(), spyRMContext);
+ a.submitApplicationAttempt(app_0, user_0);
+
+ final ApplicationAttemptId appAttemptId_1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_1, a,
+ a.getAbstractUsersManager(), spyRMContext);
+ a.submitApplicationAttempt(app_1, user_1); // different user
+
+ // Setup some nodes
+ String host_0 = "127.0.0.1";
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ String host_1 = "127.0.0.2";
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+
+ final int numNodes = 2;
+ Resource clusterResource =
+ Resources.createResource(numNodes * (8*GB), numNodes * 16);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests
+ // app_0 asks for 3 3-GB containers
+ Priority priority = TestUtils.createMockPriority(1);
+ app_0.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 3, true,
+ priority, recordFactory)));
+
+ // app_1 asks for 2 1-GB containers
+ app_1.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
+
+ Map apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+ app_1);
+ Map nodes = ImmutableMap.of(node_0.getNodeID(),
+ node_0, node_1.getNodeID(), node_1);
+
+ /**
+ * Start testing...
+ */
+
+ // There're two active users
+ assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
+
+ // 1 container to user_0. Since queue starts out empty, user limit would
+ // normally be calculated to be the minumum container size (1024GB).
+ // However, in this case, user_0 has a weight of 1.5, so the UL is 2048GB
+ // because 1024 * 1.5 rounded up to container size is 2048GB.
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+ assertEquals(4*GB, a.getUsedResources().getMemorySize());
+ assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize());
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+
+ // At this point the queue-wide user limit is 3072GB, but since user_0 has a
+ // weight of 1.5, its user limit is 5120GB. So, even though user_0 already
+ // has 4096GB, it is under its user limit, so it gets another container.
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+ assertEquals(8*GB, a.getUsedResources().getMemorySize());
+ assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+
+ // Queue-wide user limit at this point is 4069GB and user_0's user limit is
+ // 6144GB. user_0 has 8192GB.
+ // Now that user_0 is above its user limit, the next container should go to user_1
+ applyCSAssignment(clusterResource,
+ a.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+ assertEquals(9*GB, a.getUsedResources().getMemorySize());
+ assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
+
+ assertEquals(4*GB,
+ app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize());
+ assertEquals(1*GB,
+ app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md
index 737bdc2..f1d4535 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md
@@ -124,6 +124,7 @@ Configuration
| `yarn.scheduler.capacity..user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle the cluster is. Value is specified as a float. |
| `yarn.scheduler.capacity..maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. |
| `yarn.scheduler.capacity..maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. |
+| `yarn.scheduler.capacity..user-settings..weight` | This floating point value is used when calculating the user limit resource values for users in a queue. This value will weight each user more or less than the other users in the queue. For example, if user A should receive 50% more resources in a queue than users B and C, this property will be set to 1.5 for user A. Users B and C will default to 1.0. |
* Running and Pending Application Limits