Coverage Summary for Class: UsersManager (org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity)
| Class | Method, % | Line, % |
|---|---|---|
| UsersManager | 23.1% (9/ 39) | 19.6% (70/ 358) |
| UsersManager$UsageRatios | 60% (3/ 5) | 42.9% (12/ 28) |
| UsersManager$User | 0% (0/ 21) | 0% (0/ 51) |
| total | 18.5% (12/ 65) | 18.8% (82/ 437) |
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
19
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.locks.ReentrantReadWriteLock;
28 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
29 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
30
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.apache.hadoop.classification.InterfaceAudience.Private;
34 import org.apache.hadoop.yarn.api.records.ApplicationId;
35 import org.apache.hadoop.yarn.api.records.Resource;
36 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
37 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
38 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
39 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
40 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
41 import org.apache.hadoop.yarn.util.resource.Resources;
42
43 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
44
45 /**
46 * {@link UsersManager} tracks users in the system and its respective data
47 * structures.
48 */
49 @Private
50 public class UsersManager implements AbstractUsersManager {
51
52 private static final Logger LOG =
53 LoggerFactory.getLogger(UsersManager.class);
54
55 /*
56 * Member declaration for UsersManager class.
57 */
58 private final LeafQueue lQueue;
59 private final RMNodeLabelsManager labelManager;
60 private final ResourceCalculator resourceCalculator;
61 private final CapacitySchedulerContext scheduler;
62 private Map<String, User> users = new ConcurrentHashMap<>();
63
64 private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage();
65 private ResourceUsage totalResUsageForNonActiveUsers = new ResourceUsage();
66 private Set<String> activeUsersSet = new HashSet<String>();
67 private Set<String> nonActiveUsersSet = new HashSet<String>();
68
69 // Summation of consumed ratios for all users in queue
70 private UsageRatios qUsageRatios;
71
72 // To detect whether there is a change in user count for every user-limit
73 // calculation.
74 private long latestVersionOfUsersState = 0;
75 private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
76 new HashMap<String, Map<SchedulingMode, Long>>();
77 private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
78 new HashMap<String, Map<SchedulingMode, Long>>();
79
80 private volatile float userLimit;
81 private volatile float userLimitFactor;
82
83 private WriteLock writeLock;
84 private ReadLock readLock;
85
86 private final QueueMetrics metrics;
87 private AtomicInteger activeUsers = new AtomicInteger(0);
88 private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0);
89 private Map<String, Set<ApplicationId>> usersApplications =
90 new HashMap<String, Set<ApplicationId>>();
91
92 // Pre-computed list of user-limits.
93 @VisibleForTesting
94 Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit =
95 new HashMap<>();
96 @VisibleForTesting
97 Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit =
98 new HashMap<>();
99
100 private float activeUsersTimesWeights = 0.0f;
101 private float allUsersTimesWeights = 0.0f;
102
103 /**
104 * UsageRatios will store the total used resources ratio across all users of
105 * the queue.
106 */
107 static private class UsageRatios {
108 private Map<String, Float> usageRatios;
109 private ReadLock readLock;
110 private WriteLock writeLock;
111
112 public UsageRatios() {
113 ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
114 readLock = lock.readLock();
115 writeLock = lock.writeLock();
116 usageRatios = new HashMap<String, Float>();
117 }
118
119 private void incUsageRatio(String label, float delta) {
120 writeLock.lock();
121 try {
122 float usage = 0f;
123 if (usageRatios.containsKey(label)) {
124 usage = usageRatios.get(label);
125 }
126 usage += delta;
127 usageRatios.put(label, usage);
128 } finally {
129 writeLock.unlock();
130 }
131 }
132
133 private float getUsageRatio(String label) {
134 readLock.lock();
135 try {
136 Float f = usageRatios.get(label);
137 if (null == f) {
138 return 0.0f;
139 }
140 return f;
141 } finally {
142 readLock.unlock();
143 }
144 }
145
146 private void setUsageRatio(String label, float ratio) {
147 writeLock.lock();
148 try {
149 usageRatios.put(label, ratio);
150 } finally {
151 writeLock.unlock();
152 }
153 }
154 } /* End of UserRatios class */
155
156 /**
157 * User class stores all user related resource usage, application details.
158 */
159 @VisibleForTesting
160 public static class User {
161 ResourceUsage userResourceUsage = new ResourceUsage();
162 String userName = null;
163 volatile Resource userResourceLimit = Resource.newInstance(0, 0);
164 private volatile AtomicInteger pendingApplications = new AtomicInteger(0);
165 private volatile AtomicInteger activeApplications = new AtomicInteger(0);
166
167 private UsageRatios userUsageRatios = new UsageRatios();
168 private WriteLock writeLock;
169 private float weight;
170
171 public User(String name) {
172 ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
173 // Nobody uses read-lock now, will add it when necessary
174 writeLock = lock.writeLock();
175
176 this.userName = name;
177 }
178
179 public ResourceUsage getResourceUsage() {
180 return userResourceUsage;
181 }
182
183 public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator,
184 Resource resource, String nodePartition) {
185 writeLock.lock();
186 try {
187 userUsageRatios.setUsageRatio(nodePartition, 0);
188 return updateUsageRatio(resourceCalculator, resource, nodePartition);
189 } finally {
190 writeLock.unlock();
191 }
192 }
193
194 public float updateUsageRatio(ResourceCalculator resourceCalculator,
195 Resource resource, String nodePartition) {
196 writeLock.lock();
197 try {
198 float delta;
199 float newRatio = Resources.ratio(resourceCalculator,
200 getUsed(nodePartition), resource);
201 delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
202 userUsageRatios.setUsageRatio(nodePartition, newRatio);
203 return delta;
204 } finally {
205 writeLock.unlock();
206 }
207 }
208
209 public Resource getUsed() {
210 return userResourceUsage.getUsed();
211 }
212
213 public Resource getAllUsed() {
214 return userResourceUsage.getAllUsed();
215 }
216
217 public Resource getUsed(String label) {
218 return userResourceUsage.getUsed(label);
219 }
220
221 public int getPendingApplications() {
222 return pendingApplications.get();
223 }
224
225 public int getActiveApplications() {
226 return activeApplications.get();
227 }
228
229 public Resource getConsumedAMResources() {
230 return userResourceUsage.getAMUsed();
231 }
232
233 public Resource getConsumedAMResources(String label) {
234 return userResourceUsage.getAMUsed(label);
235 }
236
237 public int getTotalApplications() {
238 return getPendingApplications() + getActiveApplications();
239 }
240
241 public void submitApplication() {
242 pendingApplications.incrementAndGet();
243 }
244
245 public void activateApplication() {
246 pendingApplications.decrementAndGet();
247 activeApplications.incrementAndGet();
248 }
249
250 public void finishApplication(boolean wasActive) {
251 if (wasActive) {
252 activeApplications.decrementAndGet();
253 } else {
254 pendingApplications.decrementAndGet();
255 }
256 }
257
258 public Resource getUserResourceLimit() {
259 return userResourceLimit;
260 }
261
262 public void setUserResourceLimit(Resource userResourceLimit) {
263 this.userResourceLimit = userResourceLimit;
264 }
265
266 public String getUserName() {
267 return this.userName;
268 }
269
270 @VisibleForTesting
271 public void setResourceUsage(ResourceUsage resourceUsage) {
272 this.userResourceUsage = resourceUsage;
273 }
274
275 /**
276 * @return the weight
277 */
278 public float getWeight() {
279 return weight;
280 }
281
282 /**
283 * @param weight the weight to set
284 */
285 public void setWeight(float weight) {
286 this.weight = weight;
287 }
288 } /* End of User class */
289
290 /**
291 * UsersManager Constructor.
292 *
293 * @param metrics
294 * Queue Metrics
295 * @param lQueue
296 * Leaf Queue Object
297 * @param labelManager
298 * Label Manager instance
299 * @param scheduler
300 * Capacity Scheduler Context
301 * @param resourceCalculator
302 * rc
303 */
304 public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
305 RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
306 ResourceCalculator resourceCalculator) {
307 ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
308 this.lQueue = lQueue;
309 this.scheduler = scheduler;
310 this.labelManager = labelManager;
311 this.resourceCalculator = resourceCalculator;
312 this.qUsageRatios = new UsageRatios();
313 this.metrics = metrics;
314
315 this.writeLock = lock.writeLock();
316 this.readLock = lock.readLock();
317 }
318
319 /**
320 * Get configured user-limit.
321 * @return user limit
322 */
323 public float getUserLimit() {
324 return userLimit;
325 }
326
327 /**
328 * Set configured user-limit.
329 * @param userLimit user limit
330 */
331 public void setUserLimit(float userLimit) {
332 this.userLimit = userLimit;
333 }
334
335 /**
336 * Get configured user-limit factor.
337 * @return user-limit factor
338 */
339 public float getUserLimitFactor() {
340 return userLimitFactor;
341 }
342
343 /**
344 * Set configured user-limit factor.
345 * @param userLimitFactor User Limit factor.
346 */
347 public void setUserLimitFactor(float userLimitFactor) {
348 this.userLimitFactor = userLimitFactor;
349 }
350
351 @VisibleForTesting
352 public float getUsageRatio(String label) {
353 return qUsageRatios.getUsageRatio(label);
354 }
355
356 /**
357 * Force UsersManager to recompute userlimit.
358 */
359 public void userLimitNeedsRecompute() {
360
361 // If latestVersionOfUsersState is negative due to overflow, ideally we need
362 // to reset it. This method is invoked from UsersManager and LeafQueue and
363 // all is happening within write/readLock. Below logic can help to set 0.
364 writeLock.lock();
365 try {
366
367 long value = ++latestVersionOfUsersState;
368 if (value < 0) {
369 latestVersionOfUsersState = 0;
370 }
371 } finally {
372 writeLock.unlock();
373 }
374 }
375
376 /*
377 * Get all users of queue.
378 */
379 public Map<String, User> getUsers() {
380 return users;
381 }
382
383 /**
384 * Get user object for given user name.
385 *
386 * @param userName
387 * User Name
388 * @return User object
389 */
390 public User getUser(String userName) {
391 return users.get(userName);
392 }
393
394 /**
395 * Remove user.
396 *
397 * @param userName
398 * User Name
399 */
400 public void removeUser(String userName) {
401 writeLock.lock();
402 try {
403 this.users.remove(userName);
404
405 // Remove user from active/non-active list as well.
406 activeUsersSet.remove(userName);
407 nonActiveUsersSet.remove(userName);
408 activeUsersTimesWeights = sumActiveUsersTimesWeights();
409 allUsersTimesWeights = sumAllUsersTimesWeights();
410 } finally {
411 writeLock.unlock();
412 }
413 }
414
415 /**
416 * Get and add user if absent.
417 *
418 * @param userName
419 * User Name
420 * @return User object
421 */
422 public User getUserAndAddIfAbsent(String userName) {
423 writeLock.lock();
424 try {
425 User u = getUser(userName);
426 if (null == u) {
427 u = new User(userName);
428 addUser(userName, u);
429
430 // Add to nonActive list so that resourceUsage could be tracked
431 if (!nonActiveUsersSet.contains(userName)) {
432 nonActiveUsersSet.add(userName);
433 }
434 }
435 return u;
436 } finally {
437 writeLock.unlock();
438 }
439 }
440
441 /*
442 * Add a new user
443 */
444 private void addUser(String userName, User user) {
445 this.users.put(userName, user);
446 user.setWeight(getUserWeightFromQueue(userName));
447 allUsersTimesWeights = sumAllUsersTimesWeights();
448 }
449
450 /**
451 * @return an ArrayList of UserInfo objects who are active in this queue
452 */
453 public ArrayList<UserInfo> getUsersInfo() {
454 readLock.lock();
455 try {
456 ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
457 for (Map.Entry<String, User> entry : getUsers().entrySet()) {
458 User user = entry.getValue();
459 usersToReturn.add(
460 new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
461 user.getActiveApplications(), user.getPendingApplications(),
462 Resources.clone(user.getConsumedAMResources()),
463 Resources.clone(user.getUserResourceLimit()),
464 user.getResourceUsage(), user.getWeight(),
465 activeUsersSet.contains(user.userName)));
466 }
467 return usersToReturn;
468 } finally {
469 readLock.unlock();
470 }
471 }
472
473 private float getUserWeightFromQueue(String userName) {
474 Float weight = lQueue.getUserWeights().get(userName);
475 return (weight == null) ? 1.0f : weight.floatValue();
476 }
477
478 /**
479 * Get computed user-limit for all ACTIVE users in this queue. If cached data
480 * is invalidated due to resource change, this method also enforce to
481 * recompute user-limit.
482 *
483 * @param userName
484 * Name of user who has submitted one/more app to given queue.
485 * @param clusterResource
486 * total cluster resource
487 * @param nodePartition
488 * partition name
489 * @param schedulingMode
490 * scheduling mode
491 * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
492 * @return Computed User Limit
493 */
494 public Resource getComputedResourceLimitForActiveUsers(String userName,
495 Resource clusterResource, String nodePartition,
496 SchedulingMode schedulingMode) {
497
498 Map<SchedulingMode, Resource> userLimitPerSchedulingMode;
499
500 writeLock.lock();
501 try {
502 userLimitPerSchedulingMode =
503 preComputedActiveUserLimit.get(nodePartition);
504 if (isRecomputeNeeded(schedulingMode, nodePartition, true)) {
505 // recompute
506 userLimitPerSchedulingMode = reComputeUserLimits(userName,
507 nodePartition, clusterResource, schedulingMode, true);
508
509 // update user count to cache so that we can avoid recompute if no major
510 // changes.
511 setLocalVersionOfUsersState(nodePartition, schedulingMode, true);
512 }
513 } finally {
514 writeLock.unlock();
515 }
516
517 Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
518 User user = getUser(userName);
519 float weight = (user == null) ? 1.0f : user.getWeight();
520 Resource userSpecificUserLimit =
521 Resources.multiplyAndNormalizeDown(resourceCalculator,
522 userLimitResource, weight, lQueue.getMinimumAllocation());
523
524 if (user != null) {
525 user.setUserResourceLimit(userSpecificUserLimit);
526 }
527
528 LOG.debug("userLimit is fetched. userLimit={}, userSpecificUserLimit={},"
529 + " schedulingMode={}, partition={}", userLimitResource,
530 userSpecificUserLimit, schedulingMode, nodePartition);
531
532 return userSpecificUserLimit;
533 }
534
535 /**
536 * Get computed user-limit for all users in this queue. If cached data is
537 * invalidated due to resource change, this method also enforce to recompute
538 * user-limit.
539 *
540 * @param userName
541 * Name of user who has submitted one/more app to given queue.
542 * @param clusterResource
543 * total cluster resource
544 * @param nodePartition
545 * partition name
546 * @param schedulingMode
547 * scheduling mode
548 * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
549 * @return Computed User Limit
550 */
551 public Resource getComputedResourceLimitForAllUsers(String userName,
552 Resource clusterResource, String nodePartition,
553 SchedulingMode schedulingMode) {
554
555 Map<SchedulingMode, Resource> userLimitPerSchedulingMode;
556
557 writeLock.lock();
558 try {
559 userLimitPerSchedulingMode = preComputedAllUserLimit.get(nodePartition);
560 if (isRecomputeNeeded(schedulingMode, nodePartition, false)) {
561 // recompute
562 userLimitPerSchedulingMode = reComputeUserLimits(userName,
563 nodePartition, clusterResource, schedulingMode, false);
564
565 // update user count to cache so that we can avoid recompute if no major
566 // changes.
567 setLocalVersionOfUsersState(nodePartition, schedulingMode, false);
568 }
569 } finally {
570 writeLock.unlock();
571 }
572
573 Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
574 User user = getUser(userName);
575 float weight = (user == null) ? 1.0f : user.getWeight();
576 Resource userSpecificUserLimit =
577 Resources.multiplyAndNormalizeDown(resourceCalculator,
578 userLimitResource, weight, lQueue.getMinimumAllocation());
579
580 LOG.debug("userLimit is fetched. userLimit={}, userSpecificUserLimit={},"
581 + " schedulingMode={}, partition={}", userLimitResource,
582 userSpecificUserLimit, schedulingMode, nodePartition);
583
584 return userSpecificUserLimit;
585 }
586
587 protected long getLatestVersionOfUsersState() {
588 readLock.lock();
589 try {
590 return latestVersionOfUsersState;
591 } finally {
592 readLock.unlock();
593 }
594 }
595
596 /*
597 * Recompute user-limit under following conditions: 1. cached user-limit does
598 * not exist in local map. 2. Total User count doesn't match with local cached
599 * version.
600 */
601 private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
602 String nodePartition, boolean isActive) {
603 readLock.lock();
604 try {
605 return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
606 isActive) != latestVersionOfUsersState);
607 } finally {
608 readLock.unlock();
609 }
610 }
611
612 /*
613 * Set Local version of user count per label to invalidate cache if needed.
614 */
615 private void setLocalVersionOfUsersState(String nodePartition,
616 SchedulingMode schedulingMode, boolean isActive) {
617 writeLock.lock();
618 try {
619 Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
620 ? localVersionOfActiveUsersState
621 : localVersionOfAllUsersState;
622
623 Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
624 .get(nodePartition);
625 if (null == localVersion) {
626 localVersion = new HashMap<SchedulingMode, Long>();
627 localVersionOfUsersState.put(nodePartition, localVersion);
628 }
629
630 localVersion.put(schedulingMode, latestVersionOfUsersState);
631 } finally {
632 writeLock.unlock();
633 }
634 }
635
636 /*
637 * Get Local version of user count per label to invalidate cache if needed.
638 */
639 private long getLocalVersionOfUsersState(String nodePartition,
640 SchedulingMode schedulingMode, boolean isActive) {
641 this.readLock.lock();
642 try {
643 Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
644 ? localVersionOfActiveUsersState
645 : localVersionOfAllUsersState;
646
647 if (!localVersionOfUsersState.containsKey(nodePartition)) {
648 return -1;
649 }
650
651 Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
652 .get(nodePartition);
653 if (!localVersion.containsKey(schedulingMode)) {
654 return -1;
655 }
656
657 return localVersion.get(schedulingMode);
658 } finally {
659 readLock.unlock();
660 }
661 }
662
663 private Map<SchedulingMode, Resource> reComputeUserLimits(String userName,
664 String nodePartition, Resource clusterResource,
665 SchedulingMode schedulingMode, boolean activeMode) {
666
667 // preselect stored map as per active user-limit or all user computation.
668 Map<String, Map<SchedulingMode, Resource>> computedMap = null;
669 computedMap = (activeMode)
670 ? preComputedActiveUserLimit
671 : preComputedAllUserLimit;
672
673 Map<SchedulingMode, Resource> userLimitPerSchedulingMode = computedMap
674 .get(nodePartition);
675
676 if (userLimitPerSchedulingMode == null) {
677 userLimitPerSchedulingMode = new ConcurrentHashMap<>();
678 computedMap.put(nodePartition, userLimitPerSchedulingMode);
679 }
680
681 // compute user-limit per scheduling mode.
682 Resource computedUserLimit = computeUserLimit(userName, clusterResource,
683 nodePartition, schedulingMode, activeMode);
684
685 // update in local storage
686 userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
687
688 computeNumActiveUsersWithOnlyPendingApps();
689
690 return userLimitPerSchedulingMode;
691 }
692
693 // This method is called within the lock.
694 private void computeNumActiveUsersWithOnlyPendingApps() {
695 int numPendingUsers = 0;
696 for (User user : users.values()) {
697 if ((user.getPendingApplications() > 0)
698 && (user.getActiveApplications() <= 0)) {
699 numPendingUsers++;
700 }
701 }
702 activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers);
703 }
704
705 @VisibleForTesting
706 Resource computeUserLimit(String userName, Resource clusterResource,
707 String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
708 Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
709 clusterResource);
710
711 /*
712 * What is our current capacity?
713 * * It is equal to the max(required, queue-capacity) if we're running
714 * below capacity. The 'max' ensures that jobs in queues with miniscule
715 * capacity (< 1 slot) make progress
716 * * If we're running over capacity, then its (usedResources + required)
717 * (which extra resources we are allocating)
718 */
719 Resource queueCapacity = lQueue.getEffectiveCapacity(nodePartition);
720 Resource originalCapacity = queueCapacity;
721
722 /*
723 * Assume we have required resource equals to minimumAllocation, this can
724 * make sure user limit can continuously increase till queueMaxResource
725 * reached.
726 */
727 Resource required = lQueue.getMinimumAllocation();
728
729 // Allow progress for queues with miniscule capacity
730 queueCapacity = Resources.max(resourceCalculator, partitionResource,
731 queueCapacity, required);
732
733 /*
734 * We want to base the userLimit calculation on
735 * max(queueCapacity, usedResources+required). However, we want
736 * usedResources to be based on the combined ratios of all the users in the
737 * queue so we use consumedRatio to calculate such.
738 * The calculation is dependent on how the resourceCalculator calculates the
739 * ratio between two Resources. DRF Example: If usedResources is greater
740 * than queueCapacity and users have the following [mem,cpu] usages:
741 *
742 * User1: [10%,20%] - Dominant resource is 20%
743 * User2: [30%,10%] - Dominant resource is 30%
744 * Then total consumedRatio is then 20+30=50%. Yes, this value can be
745 * larger than 100% but for the purposes of making sure all users are
746 * getting their fair share, it works.
747 */
748 Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
749 partitionResource, getUsageRatio(nodePartition),
750 lQueue.getMinimumAllocation());
751 Resource currentCapacity = Resources.lessThan(resourceCalculator,
752 partitionResource, consumed, queueCapacity)
753 ? queueCapacity
754 : Resources.add(consumed, required);
755
756 /*
757 * Never allow a single user to take more than the queue's configured
758 * capacity * user-limit-factor. Also, the queue's configured capacity
759 * should be higher than queue-hard-limit * ulMin
760 */
761 float usersSummedByWeight = activeUsersTimesWeights;
762 Resource resourceUsed = Resources.add(
763 totalResUsageForActiveUsers.getUsed(nodePartition),
764 required);
765
766 // For non-activeUser calculation, consider all users count.
767 if (!activeUser) {
768 resourceUsed = currentCapacity;
769 usersSummedByWeight = allUsersTimesWeights;
770 }
771
772 /*
773 * User limit resource is determined by: max(currentCapacity / #activeUsers,
774 * currentCapacity * user-limit-percentage%)
775 */
776 Resource userLimitResource = Resources.max(resourceCalculator,
777 partitionResource,
778 Resources.divideAndCeil(resourceCalculator, resourceUsed,
779 usersSummedByWeight),
780 Resources.divideAndCeil(resourceCalculator,
781 Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
782 100));
783
784 // User limit is capped by maxUserLimit
785 // - maxUserLimit = queueCapacity * user-limit-factor
786 // (RESPECT_PARTITION_EXCLUSIVITY)
787 // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
788 //
789 // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
790 // partition, its guaranteed resource on that partition is 0. And
791 // user-limit-factor computation is based on queue's guaranteed capacity. So
792 // we will not cap user-limit as well as used resource when doing
793 // IGNORE_PARTITION_EXCLUSIVITY allocation.
794 Resource maxUserLimit = Resources.none();
795 if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
796 if (getUserLimitFactor() == -1 ||
797 originalCapacity.equals(Resources.none())) {
798 // If user-limit-factor set to -1, we should disable user limit.
799 //
800 // Also prevent incorrect maxUserLimit due to low queueCapacity
801 // Can happen if dynamic queue has capacity = 0%
802 maxUserLimit = lQueue.
803 getEffectiveMaxCapacityDown(
804 nodePartition, lQueue.getMinimumAllocation());
805 } else {
806 maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
807 getUserLimitFactor());
808 }
809 } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
810 maxUserLimit = partitionResource;
811 }
812
813 // Cap final user limit with maxUserLimit
814 userLimitResource = Resources
815 .roundUp(resourceCalculator,
816 Resources.min(resourceCalculator, partitionResource,
817 userLimitResource, maxUserLimit),
818 lQueue.getMinimumAllocation());
819
820 if (LOG.isDebugEnabled()) {
821 LOG.debug("User limit computation for " + userName
822 + ", in queue: " + lQueue.getQueuePath()
823 + ", userLimitPercent=" + lQueue.getUserLimit()
824 + ", userLimitFactor=" + lQueue.getUserLimitFactor()
825 + ", required=" + required
826 + ", consumed=" + consumed
827 + ", user-limit-resource=" + userLimitResource
828 + ", queueCapacity=" + queueCapacity
829 + ", qconsumed=" + lQueue.getQueueResourceUsage().getUsed()
830 + ", currentCapacity=" + currentCapacity
831 + ", activeUsers=" + usersSummedByWeight
832 + ", clusterCapacity=" + clusterResource
833 + ", resourceByLabel=" + partitionResource
834 + ", usageratio=" + getUsageRatio(nodePartition)
835 + ", Partition=" + nodePartition
836 + ", resourceUsed=" + resourceUsed
837 + ", maxUserLimit=" + maxUserLimit
838 + ", userWeight=" + getUser(userName).getWeight()
839 );
840 }
841 return userLimitResource;
842 }
843
844 /**
845 * Update new usage ratio.
846 *
847 * @param partition
848 * Node partition
849 * @param clusterResource
850 * Cluster Resource
851 */
852 public void updateUsageRatio(String partition, Resource clusterResource) {
853 writeLock.lock();
854 try {
855 Resource resourceByLabel = labelManager.getResourceByLabel(partition,
856 clusterResource);
857 float consumed = 0;
858 User user;
859 for (Map.Entry<String, User> entry : getUsers().entrySet()) {
860 user = entry.getValue();
861 consumed += user.setAndUpdateUsageRatio(resourceCalculator,
862 resourceByLabel, partition);
863 }
864
865 qUsageRatios.setUsageRatio(partition, consumed);
866 } finally {
867 writeLock.unlock();
868 }
869 }
870
871 /*
872 * Increment Queue Usage Ratio.
873 */
874 private void incQueueUsageRatio(String nodePartition, float delta) {
875 qUsageRatios.incUsageRatio(nodePartition, delta);
876 }
877
878 @Override
879 public void activateApplication(String user, ApplicationId applicationId) {
880
881 this.writeLock.lock();
882 try {
883 User userDesc = getUser(user);
884 if (userDesc != null && userDesc.getActiveApplications() <= 0) {
885 return;
886 }
887
888 Set<ApplicationId> userApps = usersApplications.get(user);
889 if (userApps == null) {
890 userApps = new HashSet<ApplicationId>();
891 usersApplications.put(user, userApps);
892 activeUsers.incrementAndGet();
893 metrics.incrActiveUsers();
894
895 // A user is added to active list. Invalidate user-limit cache.
896 userLimitNeedsRecompute();
897 updateActiveUsersResourceUsage(user);
898 LOG.debug("User {} added to activeUsers, currently: {}",
899 user, activeUsers);
900 }
901 if (userApps.add(applicationId)) {
902 metrics.activateApp(user);
903 }
904 } finally {
905 this.writeLock.unlock();
906 }
907 }
908
909 @Override
910 public void deactivateApplication(String user, ApplicationId applicationId) {
911
912 this.writeLock.lock();
913 try {
914 Set<ApplicationId> userApps = usersApplications.get(user);
915 if (userApps != null) {
916 if (userApps.remove(applicationId)) {
917 metrics.deactivateApp(user);
918 }
919 if (userApps.isEmpty()) {
920 usersApplications.remove(user);
921 activeUsers.decrementAndGet();
922 metrics.decrActiveUsers();
923
924 // A user is removed from active list. Invalidate user-limit cache.
925 userLimitNeedsRecompute();
926 updateNonActiveUsersResourceUsage(user);
927 LOG.debug("User {} removed from activeUsers, currently: {}",
928 user, activeUsers);
929 }
930 }
931 } finally {
932 this.writeLock.unlock();
933 }
934 }
935
936 @Override
937 public int getNumActiveUsers() {
938 return activeUsers.get() + activeUsersWithOnlyPendingApps.get();
939 }
940
941 float sumActiveUsersTimesWeights() {
942 float count = 0.0f;
943 this.readLock.lock();
944 try {
945 for (String u : activeUsersSet) {
946 count += getUser(u).getWeight();
947 }
948 return count;
949 } finally {
950 this.readLock.unlock();
951 }
952 }
953
954 float sumAllUsersTimesWeights() {
955 float count = 0.0f;
956 this.readLock.lock();
957 try {
958 for (String u : users.keySet()) {
959 count += getUser(u).getWeight();
960 }
961 return count;
962 } finally {
963 this.readLock.unlock();
964 }
965 }
966
967 private void updateActiveUsersResourceUsage(String userName) {
968 this.writeLock.lock();
969 try {
970 // For UT case: We might need to add the user to users list.
971 User user = getUserAndAddIfAbsent(userName);
972 ResourceUsage resourceUsage = user.getResourceUsage();
973 // If User is moved to active list, moved resource usage from non-active
974 // to active list.
975 if (nonActiveUsersSet.contains(userName)) {
976 nonActiveUsersSet.remove(userName);
977 activeUsersSet.add(userName);
978 activeUsersTimesWeights = sumActiveUsersTimesWeights();
979
980 // Update total resource usage of active and non-active after user
981 // is moved from non-active to active.
982 for (String partition : resourceUsage.getNodePartitionsSet()) {
983 totalResUsageForNonActiveUsers.decUsed(partition,
984 resourceUsage.getUsed(partition));
985 totalResUsageForActiveUsers.incUsed(partition,
986 resourceUsage.getUsed(partition));
987 }
988
989 if (LOG.isDebugEnabled()) {
990 LOG.debug("User '" + userName
991 + "' has become active. Hence move user to active list."
992 + "Active users size = " + activeUsersSet.size()
993 + "Non-active users size = " + nonActiveUsersSet.size()
994 + "Total Resource usage for active users="
995 + totalResUsageForActiveUsers.getAllUsed() + "."
996 + "Total Resource usage for non-active users="
997 + totalResUsageForNonActiveUsers.getAllUsed());
998 }
999 }
1000 } finally {
1001 this.writeLock.unlock();
1002 }
1003 }
1004
1005 private void updateNonActiveUsersResourceUsage(String userName) {
1006 this.writeLock.lock();
1007 try {
1008
1009 // For UT case: We might need to add the user to users list.
1010 User user = getUser(userName);
1011 if (user == null) return;
1012
1013 ResourceUsage resourceUsage = user.getResourceUsage();
1014 // If User is moved to non-active list, moved resource usage from
1015 // non-active to active list.
1016 if (activeUsersSet.contains(userName)) {
1017 activeUsersSet.remove(userName);
1018 nonActiveUsersSet.add(userName);
1019 activeUsersTimesWeights = sumActiveUsersTimesWeights();
1020
1021 // Update total resource usage of active and non-active after user is
1022 // moved from active to non-active.
1023 for (String partition : resourceUsage.getNodePartitionsSet()) {
1024 totalResUsageForActiveUsers.decUsed(partition,
1025 resourceUsage.getUsed(partition));
1026 totalResUsageForNonActiveUsers.incUsed(partition,
1027 resourceUsage.getUsed(partition));
1028
1029 if (LOG.isDebugEnabled()) {
1030 LOG.debug("User '" + userName
1031 + "' has become non-active.Hence move user to non-active list."
1032 + "Active users size = " + activeUsersSet.size()
1033 + "Non-active users size = " + nonActiveUsersSet.size()
1034 + "Total Resource usage for active users="
1035 + totalResUsageForActiveUsers.getAllUsed() + "."
1036 + "Total Resource usage for non-active users="
1037 + totalResUsageForNonActiveUsers.getAllUsed());
1038 }
1039 }
1040 }
1041 } finally {
1042 this.writeLock.unlock();
1043 }
1044 }
1045
1046 private ResourceUsage getTotalResourceUsagePerUser(String userName) {
1047 if (nonActiveUsersSet.contains(userName)) {
1048 return totalResUsageForNonActiveUsers;
1049 } else if (activeUsersSet.contains(userName)) {
1050 return totalResUsageForActiveUsers;
1051 } else {
1052 LOG.warn("User '" + userName
1053 + "' is not present in active/non-active. This is highly unlikely."
1054 + "We can consider this user in non-active list in this case.");
1055 return totalResUsageForNonActiveUsers;
1056 }
1057 }
1058
1059 /**
1060 * During container allocate/release, ensure that all user specific data
1061 * structures are updated.
1062 *
1063 * @param userName
1064 * Name of the user
1065 * @param resource
1066 * Resource to increment/decrement
1067 * @param nodePartition
1068 * Node label
1069 * @param isAllocate
1070 * Indicate whether to allocate or release resource
1071 * @return user
1072 */
1073 public User updateUserResourceUsage(String userName, Resource resource,
1074 String nodePartition, boolean isAllocate) {
1075 this.writeLock.lock();
1076 try {
1077
1078 // TODO, should use getUser, use this method just to avoid UT failure
1079 // which is caused by wrong invoking order, will fix UT separately
1080 User user = getUserAndAddIfAbsent(userName);
1081
1082 // New container is allocated. Invalidate user-limit.
1083 updateResourceUsagePerUser(user, resource, nodePartition, isAllocate);
1084
1085 userLimitNeedsRecompute();
1086
1087 // Update usage ratios
1088 Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
1089 scheduler.getClusterResource());
1090 incQueueUsageRatio(nodePartition, user.updateUsageRatio(
1091 resourceCalculator, resourceByLabel, nodePartition));
1092
1093 return user;
1094 } finally {
1095 this.writeLock.unlock();
1096 }
1097 }
1098
1099 private void updateResourceUsagePerUser(User user, Resource resource,
1100 String nodePartition, boolean isAllocate) {
1101 ResourceUsage totalResourceUsageForUsers = getTotalResourceUsagePerUser(
1102 user.userName);
1103
1104 if (isAllocate) {
1105 user.getResourceUsage().incUsed(nodePartition, resource);
1106 totalResourceUsageForUsers.incUsed(nodePartition, resource);
1107 } else {
1108 user.getResourceUsage().decUsed(nodePartition, resource);
1109 totalResourceUsageForUsers.decUsed(nodePartition, resource);
1110 }
1111
1112 if (LOG.isDebugEnabled()) {
1113 LOG.debug(
1114 "User resource is updated." + "Total Resource usage for active users="
1115 + totalResUsageForActiveUsers.getAllUsed() + "."
1116 + "Total Resource usage for non-active users="
1117 + totalResUsageForNonActiveUsers.getAllUsed());
1118 }
1119 }
1120
1121 public void updateUserWeights() {
1122 this.writeLock.lock();
1123 try {
1124 for (Map.Entry<String, User> ue : users.entrySet()) {
1125 ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
1126 }
1127 activeUsersTimesWeights = sumActiveUsersTimesWeights();
1128 allUsersTimesWeights = sumAllUsersTimesWeights();
1129 userLimitNeedsRecompute();
1130 } finally {
1131 this.writeLock.unlock();
1132 }
1133 }
1134
1135 @VisibleForTesting
1136 public int getNumActiveUsersWithOnlyPendingApps() {
1137 return activeUsersWithOnlyPendingApps.get();
1138 }
1139
1140 @VisibleForTesting
1141 void setUsageRatio(String label, float usage) {
1142 qUsageRatios.usageRatios.put(label, usage);
1143 }
1144 }