From d9f71a7a14025224520c903669f32005337e9ea8 Mon Sep 17 00:00:00 2001 From: Sunil Date: Sat, 19 Nov 2016 21:26:39 +0530 Subject: [PATCH] YARN-5889 --- .../resourcemanager/UserToPartitionRecord.java | 88 +++++++++++++++++++++ .../capacity/FifoIntraQueuePreemptionPlugin.java | 6 +- .../scheduler/ActiveUsersManager.java | 13 ++++ .../scheduler/capacity/CapacityScheduler.java | 90 ++++++++++++++++++++++ .../capacity/CapacitySchedulerConfiguration.java | 12 +++ .../scheduler/capacity/LeafQueue.java | 81 ++++++++++++++----- ...ionalCapacityPreemptionPolicyMockFramework.java | 9 ++- 7 files changed, 275 insertions(+), 24 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java new file mode 100644 index 0000000..db35737 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.util.Records; + +public class UserToPartitionRecord + implements + Comparable { + + private String userName; + private String partition; + + public static UserToPartitionRecord newInstance(String userName, + String partition) { + UserToPartitionRecord userRecord = new UserToPartitionRecord(); + userRecord.setPartition(partition); + userRecord.setUserName(userName); + return userRecord; + } + + public String getPartition() { + return partition; + } + + public String getUserName() { + return userName; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + @Override + public int compareTo(UserToPartitionRecord o) { + int result = this.getUserName().compareTo(o.getUserName()); + if (result == 0) { + return this.getPartition().compareTo(o.getPartition()); + } else { + return result; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + UserToPartitionRecord other = (UserToPartitionRecord) obj; + if (getUserName() != other.getUserName() + || getPartition() != other.getPartition()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + final int prime = 493217; + int result = 8501; + result = prime * result + this.getUserName().hashCode(); + result = prime * result + this.getPartition().hashCode(); + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 757f567..1e98c70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -264,8 +265,9 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // Verify whether we already calculated headroom for this user. if (userLimitResource == null) { - userLimitResource = Resources.clone(tq.leafQueue - .getUserLimitPerUser(userName, partitionBasedResource, partition)); + userLimitResource = Resources.clone(tq.leafQueue.getComputedUserLimit( + userName, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + clusterResource, tq.leafQueue.getUser(userName))); Resource amUsed = perUserAMUsed.get(userName); if (null == amUsed) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 36e6858..1542a49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -26,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.utils.Lock; /** @@ -106,4 +109,14 @@ synchronized public void deactivateApplication( synchronized public int getNumActiveUsers() { return activeUsers; } + + /** + * Get active users i.e. users with applications which have pending + * resource requests. + * @return number of active users + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public Collection getActiveUsers() { + return Collections.unmodifiableCollection(usersApplications.keySet()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 65a08c6..2a7bb38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -245,10 +245,13 @@ public Configuration getConf() { private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; + private boolean computeUserLimitAsynchronously; + private ComputeUserLimitAsyncThread computeUserLimitAsyncThread; /** * EXPERT */ private long asyncScheduleInterval; + private List leafQueues; private static final String ASYNC_SCHEDULER_INTERVAL = CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms"; @@ -352,6 +355,11 @@ void initScheduler(Configuration configuration) throws // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + computeUserLimitAsynchronously = this.conf.getUserLimitAynschronously(); + if (computeUserLimitAsynchronously) { + computeUserLimitAsyncThread = new ComputeUserLimitAsyncThread(this); + } + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" @@ -391,6 +399,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { startSchedulerThreads(); + computeUserLimitAsyncThread.start(); super.serviceStart(); } @@ -406,6 +415,10 @@ public void serviceStop() throws Exception { resourceCommitterService.interrupt(); resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS); } + if (computeUserLimitAsynchronously) { + computeUserLimitAsyncThread.interrupt(); + computeUserLimitAsyncThread.join(THREAD_JOIN_TIMEOUT_MS); + } } finally { writeLock.unlock(); } @@ -513,6 +526,44 @@ public void suspendSchedule() { } + static class ComputeUserLimitAsyncThread extends Thread { + private final CapacityScheduler cs; + + public ComputeUserLimitAsyncThread(CapacityScheduler cs) { + this.cs = cs; + setDaemon(true); + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + List leafQueues = cs.getAllLeafQueues(); + + // Compute userlimit per queue. + for (CSQueue queue : leafQueues) { + LeafQueue lqueue = (LeafQueue) queue; + + Collection users = lqueue.getActiveUsersManager() + .getActiveUsers(); + for (String user : users) { + Set partitions = lqueue.getNodeLabelsForQueue(); + for (String partition : partitions) { + lqueue.storeComputedUserLimits(user, partition, + cs.getClusterResource()); + } + } + } + + // Sleep for millisecond + Thread.sleep(1); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + } + static class ResourceCommitterService extends Thread { private final CapacityScheduler cs; private BlockingQueue> @@ -622,6 +673,7 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) LOG.info("Initialized root queue " + root); updatePlacementRules(); setQueueAcls(authorizer, queues); + setAllLeafQueues(); // Notify Preemption Manager preemptionManager.refreshQueues(null, root); @@ -653,11 +705,49 @@ private void reinitializeQueues(CapacitySchedulerConfiguration newConf) labelManager.reinitializeQueueLabels(getQueueToLabels()); setQueueAcls(authorizer, queues); + setAllLeafQueues(); // Notify Preemption Manager preemptionManager.refreshQueues(null, root); } + private void setAllLeafQueues() { + try { + writeLock.lock(); + List leafQueues = new ArrayList(); + fetchLeafQueues(getRootQueue(), leafQueues); + this.leafQueues = leafQueues; + } finally { + writeLock.unlock(); + } + } + + public List getAllLeafQueues() { + try { + readLock.lock(); + return this.leafQueues; + } finally { + readLock.unlock(); + } + } + + private void fetchLeafQueues(CSQueue curQueue, List leafQueues) { + try { + readLock.lock(); + if (curQueue instanceof ParentQueue) { + // Recursively add children + for (CSQueue c : curQueue.getChildQueues()) { + if (c instanceof LeafQueue) { + leafQueues.add(c); + } + fetchLeafQueues(c, leafQueues); + } + } + } finally { + readLock.unlock(); + } + } + @VisibleForTesting public static void setQueueAcls(YarnAuthorizationProvider authorizer, Map queues) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index f8335a8..140f890 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -216,6 +216,9 @@ public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; @Private + public static final boolean DEFAULT_COMPUTE_USERLIMIT_ASYNCHRONOUSLY_ENABLE = true; + + @Private public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; @Private @@ -274,6 +277,10 @@ @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + @Private + public static final String COMPUTE_USERLIMIT_ASYNCHRONOUSLY_ENABLE = + PREFIX + "compute-userlimit-asynchronously.enable"; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -783,6 +790,11 @@ public boolean getOverrideWithQueueMappings() { DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); } + public boolean getUserLimitAynschronously() { + return getBoolean(COMPUTE_USERLIMIT_ASYNCHRONOUSLY_ENABLE, + DEFAULT_COMPUTE_USERLIMIT_ASYNCHRONOUSLY_ENABLE); + } + /** * Returns a collection of strings, trimming leading and trailing whitespeace * on each value diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9661206..6d5a6c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.UserToPartitionRecord; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -139,6 +140,9 @@ private Map> ignorePartitionExclusivityRMContainers = new ConcurrentHashMap<>(); + Map> preComputedUserLimit = + new ConcurrentHashMap<>(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -1249,8 +1253,8 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, String partition) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application.getUser(), clusterResource, user, - partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + getComputedUserLimit(application.getUser(), partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, clusterResource, user), partition); } @@ -1322,9 +1326,8 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = - computeUserLimit(application.getUser(), clusterResource, queueUser, - nodePartition, schedulingMode); + Resource userLimit = getComputedUserLimit(application.getUser(), + nodePartition, schedulingMode, clusterResource, queueUser); setQueueResourceLimitsInfo(clusterResource); @@ -1360,6 +1363,56 @@ public boolean getRackLocalityFullReset() { return rackLocalityFullReset; } + public Resource getComputedUserLimit(String userName, String nodePartition, + SchedulingMode schedulingMode, Resource clusterResource, User user) { + + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + List userLimitPerSchedulingMode = preComputedUserLimit + .get(userRecord); + + if (userLimitPerSchedulingMode != null + && !userLimitPerSchedulingMode.isEmpty()) { + if (userLimitPerSchedulingMode + .equals(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)) { + // Userlimit adhering to partition exclusivity with be at 0th position. + return userLimitPerSchedulingMode.get(0); + } else { + return userLimitPerSchedulingMode.get(1); + } + } else { + // If async-userlimit calculation is not enabled, use the default way. + computeUserLimit(userName, clusterResource, user, nodePartition, + schedulingMode); + } + return Resources.unbounded(); + } + + public void storeComputedUserLimits(String userName, String nodePartition, + Resource clusterResource) { + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + + List userLimitPerSchedulingMode = preComputedUserLimit + .get(userRecord); + + if (userLimitPerSchedulingMode == null) { + userLimitPerSchedulingMode = new ArrayList(); + preComputedUserLimit.put(userRecord, userLimitPerSchedulingMode); + } + + userLimitPerSchedulingMode.clear(); + + Resource userLimitWithExclusivity = computeUserLimit(userName, + clusterResource, getUser(userName), nodePartition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Resource userLimitWithOutExclusivity = computeUserLimit(userName, + clusterResource, getUser(userName), nodePartition, + SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); + userLimitPerSchedulingMode.add(userLimitWithExclusivity); + userLimitPerSchedulingMode.add(userLimitWithOutExclusivity); + } + @Lock(NoLock.class) private Resource computeUserLimit(String userName, Resource clusterResource, User user, @@ -2137,10 +2190,10 @@ public Resource getTotalPendingResourcesConsideringUserLimit( String userName = app.getUser(); if (!userNameToHeadroom.containsKey(userName)) { User user = getUser(userName); - Resource headroom = Resources.subtract( - computeUserLimit(app.getUser(), resources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), - user.getUsed(partition)); + Resource headroom = Resources + .subtract(getComputedUserLimit(app.getUser(), partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, resources, + user), user.getUsed(partition)); // Make sure headroom is not negative. headroom = Resources.componentwiseMax(headroom, Resources.none()); userNameToHeadroom.put(userName, headroom); @@ -2160,16 +2213,6 @@ public Resource getTotalPendingResourcesConsideringUserLimit( } - public synchronized Resource getUserLimitPerUser(String userName, - Resource resources, String partition) { - - // Check user resource limit - User user = getUser(userName); - - return computeUserLimit(userName, resources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - } - @Override public void collectSchedulerApplications( Collection apps) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 0281c19..c2897fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -42,8 +42,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -361,9 +363,10 @@ private void mockApplications(String appsConfig) { queue.getQueueCapacities().getAbsoluteCapacity()); HashSet users = userMap.get(queue.getQueueName()); Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); - for (String user : users) { - when(queue.getUserLimitPerUser(eq(user), any(Resource.class), - anyString())).thenReturn(userLimit); + for (String userName : users) { + when(queue.getComputedUserLimit(eq(userName), anyString(), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + any(Resource.class), any(User.class))).thenReturn(userLimit); } } } -- 2.7.4 (Apple Git-66)