Coverage Summary for Class: UsersManager (org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity)

Class Method, % Line, %
UsersManager 23.1% (9/ 39) 19.6% (70/ 358)
UsersManager$UsageRatios 60% (3/ 5) 42.9% (12/ 28)
UsersManager$User 0% (0/ 21) 0% (0/ 51)
total 18.5% (12/ 65) 18.8% (82/ 437)


1 /** 2  * Licensed to the Apache Software Foundation (ASF) under one 3  * or more contributor license agreements. See the NOTICE file 4  * distributed with this work for additional information 5  * regarding copyright ownership. The ASF licenses this file 6  * to you under the Apache License, Version 2.0 (the 7  * "License"); you may not use this file except in compliance 8  * with the License. You may obtain a copy of the License at 9  * 10  * http://www.apache.org/licenses/LICENSE-2.0 11  * 12  * Unless required by applicable law or agreed to in writing, software 13  * distributed under the License is distributed on an "AS IS" BASIS, 14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15  * See the License for the specific language governing permissions and 16  * limitations under the License. 17  */ 18 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; 19  20 import java.util.ArrayList; 21 import java.util.HashMap; 22 import java.util.HashSet; 23 import java.util.Map; 24 import java.util.Set; 25 import java.util.concurrent.ConcurrentHashMap; 26 import java.util.concurrent.atomic.AtomicInteger; 27 import java.util.concurrent.locks.ReentrantReadWriteLock; 28 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 29 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 30  31 import org.slf4j.Logger; 32 import org.slf4j.LoggerFactory; 33 import org.apache.hadoop.classification.InterfaceAudience.Private; 34 import org.apache.hadoop.yarn.api.records.ApplicationId; 35 import org.apache.hadoop.yarn.api.records.Resource; 36 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; 37 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; 38 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; 39 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; 40 import org.apache.hadoop.yarn.util.resource.ResourceCalculator; 41 import org.apache.hadoop.yarn.util.resource.Resources; 42  43 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; 44  45 /** 46  * {@link UsersManager} tracks users in the system and its respective data 47  * structures. 48  */ 49 @Private 50 public class UsersManager implements AbstractUsersManager { 51  52  private static final Logger LOG = 53  LoggerFactory.getLogger(UsersManager.class); 54  55  /* 56  * Member declaration for UsersManager class. 57  */ 58  private final LeafQueue lQueue; 59  private final RMNodeLabelsManager labelManager; 60  private final ResourceCalculator resourceCalculator; 61  private final CapacitySchedulerContext scheduler; 62  private Map<String, User> users = new ConcurrentHashMap<>(); 63  64  private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage(); 65  private ResourceUsage totalResUsageForNonActiveUsers = new ResourceUsage(); 66  private Set<String> activeUsersSet = new HashSet<String>(); 67  private Set<String> nonActiveUsersSet = new HashSet<String>(); 68  69  // Summation of consumed ratios for all users in queue 70  private UsageRatios qUsageRatios; 71  72  // To detect whether there is a change in user count for every user-limit 73  // calculation. 74  private long latestVersionOfUsersState = 0; 75  private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState = 76  new HashMap<String, Map<SchedulingMode, Long>>(); 77  private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState = 78  new HashMap<String, Map<SchedulingMode, Long>>(); 79  80  private volatile float userLimit; 81  private volatile float userLimitFactor; 82  83  private WriteLock writeLock; 84  private ReadLock readLock; 85  86  private final QueueMetrics metrics; 87  private AtomicInteger activeUsers = new AtomicInteger(0); 88  private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0); 89  private Map<String, Set<ApplicationId>> usersApplications = 90  new HashMap<String, Set<ApplicationId>>(); 91  92  // Pre-computed list of user-limits. 93  @VisibleForTesting 94  Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = 95  new HashMap<>(); 96  @VisibleForTesting 97  Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = 98  new HashMap<>(); 99  100  private float activeUsersTimesWeights = 0.0f; 101  private float allUsersTimesWeights = 0.0f; 102  103  /** 104  * UsageRatios will store the total used resources ratio across all users of 105  * the queue. 106  */ 107  static private class UsageRatios { 108  private Map<String, Float> usageRatios; 109  private ReadLock readLock; 110  private WriteLock writeLock; 111  112  public UsageRatios() { 113  ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 114  readLock = lock.readLock(); 115  writeLock = lock.writeLock(); 116  usageRatios = new HashMap<String, Float>(); 117  } 118  119  private void incUsageRatio(String label, float delta) { 120  writeLock.lock(); 121  try { 122  float usage = 0f; 123  if (usageRatios.containsKey(label)) { 124  usage = usageRatios.get(label); 125  } 126  usage += delta; 127  usageRatios.put(label, usage); 128  } finally { 129  writeLock.unlock(); 130  } 131  } 132  133  private float getUsageRatio(String label) { 134  readLock.lock(); 135  try { 136  Float f = usageRatios.get(label); 137  if (null == f) { 138  return 0.0f; 139  } 140  return f; 141  } finally { 142  readLock.unlock(); 143  } 144  } 145  146  private void setUsageRatio(String label, float ratio) { 147  writeLock.lock(); 148  try { 149  usageRatios.put(label, ratio); 150  } finally { 151  writeLock.unlock(); 152  } 153  } 154  } /* End of UserRatios class */ 155  156  /** 157  * User class stores all user related resource usage, application details. 158  */ 159  @VisibleForTesting 160  public static class User { 161  ResourceUsage userResourceUsage = new ResourceUsage(); 162  String userName = null; 163  volatile Resource userResourceLimit = Resource.newInstance(0, 0); 164  private volatile AtomicInteger pendingApplications = new AtomicInteger(0); 165  private volatile AtomicInteger activeApplications = new AtomicInteger(0); 166  167  private UsageRatios userUsageRatios = new UsageRatios(); 168  private WriteLock writeLock; 169  private float weight; 170  171  public User(String name) { 172  ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 173  // Nobody uses read-lock now, will add it when necessary 174  writeLock = lock.writeLock(); 175  176  this.userName = name; 177  } 178  179  public ResourceUsage getResourceUsage() { 180  return userResourceUsage; 181  } 182  183  public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator, 184  Resource resource, String nodePartition) { 185  writeLock.lock(); 186  try { 187  userUsageRatios.setUsageRatio(nodePartition, 0); 188  return updateUsageRatio(resourceCalculator, resource, nodePartition); 189  } finally { 190  writeLock.unlock(); 191  } 192  } 193  194  public float updateUsageRatio(ResourceCalculator resourceCalculator, 195  Resource resource, String nodePartition) { 196  writeLock.lock(); 197  try { 198  float delta; 199  float newRatio = Resources.ratio(resourceCalculator, 200  getUsed(nodePartition), resource); 201  delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); 202  userUsageRatios.setUsageRatio(nodePartition, newRatio); 203  return delta; 204  } finally { 205  writeLock.unlock(); 206  } 207  } 208  209  public Resource getUsed() { 210  return userResourceUsage.getUsed(); 211  } 212  213  public Resource getAllUsed() { 214  return userResourceUsage.getAllUsed(); 215  } 216  217  public Resource getUsed(String label) { 218  return userResourceUsage.getUsed(label); 219  } 220  221  public int getPendingApplications() { 222  return pendingApplications.get(); 223  } 224  225  public int getActiveApplications() { 226  return activeApplications.get(); 227  } 228  229  public Resource getConsumedAMResources() { 230  return userResourceUsage.getAMUsed(); 231  } 232  233  public Resource getConsumedAMResources(String label) { 234  return userResourceUsage.getAMUsed(label); 235  } 236  237  public int getTotalApplications() { 238  return getPendingApplications() + getActiveApplications(); 239  } 240  241  public void submitApplication() { 242  pendingApplications.incrementAndGet(); 243  } 244  245  public void activateApplication() { 246  pendingApplications.decrementAndGet(); 247  activeApplications.incrementAndGet(); 248  } 249  250  public void finishApplication(boolean wasActive) { 251  if (wasActive) { 252  activeApplications.decrementAndGet(); 253  } else { 254  pendingApplications.decrementAndGet(); 255  } 256  } 257  258  public Resource getUserResourceLimit() { 259  return userResourceLimit; 260  } 261  262  public void setUserResourceLimit(Resource userResourceLimit) { 263  this.userResourceLimit = userResourceLimit; 264  } 265  266  public String getUserName() { 267  return this.userName; 268  } 269  270  @VisibleForTesting 271  public void setResourceUsage(ResourceUsage resourceUsage) { 272  this.userResourceUsage = resourceUsage; 273  } 274  275  /** 276  * @return the weight 277  */ 278  public float getWeight() { 279  return weight; 280  } 281  282  /** 283  * @param weight the weight to set 284  */ 285  public void setWeight(float weight) { 286  this.weight = weight; 287  } 288  } /* End of User class */ 289  290  /** 291  * UsersManager Constructor. 292  * 293  * @param metrics 294  * Queue Metrics 295  * @param lQueue 296  * Leaf Queue Object 297  * @param labelManager 298  * Label Manager instance 299  * @param scheduler 300  * Capacity Scheduler Context 301  * @param resourceCalculator 302  * rc 303  */ 304  public UsersManager(QueueMetrics metrics, LeafQueue lQueue, 305  RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler, 306  ResourceCalculator resourceCalculator) { 307  ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 308  this.lQueue = lQueue; 309  this.scheduler = scheduler; 310  this.labelManager = labelManager; 311  this.resourceCalculator = resourceCalculator; 312  this.qUsageRatios = new UsageRatios(); 313  this.metrics = metrics; 314  315  this.writeLock = lock.writeLock(); 316  this.readLock = lock.readLock(); 317  } 318  319  /** 320  * Get configured user-limit. 321  * @return user limit 322  */ 323  public float getUserLimit() { 324  return userLimit; 325  } 326  327  /** 328  * Set configured user-limit. 329  * @param userLimit user limit 330  */ 331  public void setUserLimit(float userLimit) { 332  this.userLimit = userLimit; 333  } 334  335  /** 336  * Get configured user-limit factor. 337  * @return user-limit factor 338  */ 339  public float getUserLimitFactor() { 340  return userLimitFactor; 341  } 342  343  /** 344  * Set configured user-limit factor. 345  * @param userLimitFactor User Limit factor. 346  */ 347  public void setUserLimitFactor(float userLimitFactor) { 348  this.userLimitFactor = userLimitFactor; 349  } 350  351  @VisibleForTesting 352  public float getUsageRatio(String label) { 353  return qUsageRatios.getUsageRatio(label); 354  } 355  356  /** 357  * Force UsersManager to recompute userlimit. 358  */ 359  public void userLimitNeedsRecompute() { 360  361  // If latestVersionOfUsersState is negative due to overflow, ideally we need 362  // to reset it. This method is invoked from UsersManager and LeafQueue and 363  // all is happening within write/readLock. Below logic can help to set 0. 364  writeLock.lock(); 365  try { 366  367  long value = ++latestVersionOfUsersState; 368  if (value < 0) { 369  latestVersionOfUsersState = 0; 370  } 371  } finally { 372  writeLock.unlock(); 373  } 374  } 375  376  /* 377  * Get all users of queue. 378  */ 379  public Map<String, User> getUsers() { 380  return users; 381  } 382  383  /** 384  * Get user object for given user name. 385  * 386  * @param userName 387  * User Name 388  * @return User object 389  */ 390  public User getUser(String userName) { 391  return users.get(userName); 392  } 393  394  /** 395  * Remove user. 396  * 397  * @param userName 398  * User Name 399  */ 400  public void removeUser(String userName) { 401  writeLock.lock(); 402  try { 403  this.users.remove(userName); 404  405  // Remove user from active/non-active list as well. 406  activeUsersSet.remove(userName); 407  nonActiveUsersSet.remove(userName); 408  activeUsersTimesWeights = sumActiveUsersTimesWeights(); 409  allUsersTimesWeights = sumAllUsersTimesWeights(); 410  } finally { 411  writeLock.unlock(); 412  } 413  } 414  415  /** 416  * Get and add user if absent. 417  * 418  * @param userName 419  * User Name 420  * @return User object 421  */ 422  public User getUserAndAddIfAbsent(String userName) { 423  writeLock.lock(); 424  try { 425  User u = getUser(userName); 426  if (null == u) { 427  u = new User(userName); 428  addUser(userName, u); 429  430  // Add to nonActive list so that resourceUsage could be tracked 431  if (!nonActiveUsersSet.contains(userName)) { 432  nonActiveUsersSet.add(userName); 433  } 434  } 435  return u; 436  } finally { 437  writeLock.unlock(); 438  } 439  } 440  441  /* 442  * Add a new user 443  */ 444  private void addUser(String userName, User user) { 445  this.users.put(userName, user); 446  user.setWeight(getUserWeightFromQueue(userName)); 447  allUsersTimesWeights = sumAllUsersTimesWeights(); 448  } 449  450  /** 451  * @return an ArrayList of UserInfo objects who are active in this queue 452  */ 453  public ArrayList<UserInfo> getUsersInfo() { 454  readLock.lock(); 455  try { 456  ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>(); 457  for (Map.Entry<String, User> entry : getUsers().entrySet()) { 458  User user = entry.getValue(); 459  usersToReturn.add( 460  new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()), 461  user.getActiveApplications(), user.getPendingApplications(), 462  Resources.clone(user.getConsumedAMResources()), 463  Resources.clone(user.getUserResourceLimit()), 464  user.getResourceUsage(), user.getWeight(), 465  activeUsersSet.contains(user.userName))); 466  } 467  return usersToReturn; 468  } finally { 469  readLock.unlock(); 470  } 471  } 472  473  private float getUserWeightFromQueue(String userName) { 474  Float weight = lQueue.getUserWeights().get(userName); 475  return (weight == null) ? 1.0f : weight.floatValue(); 476  } 477  478  /** 479  * Get computed user-limit for all ACTIVE users in this queue. If cached data 480  * is invalidated due to resource change, this method also enforce to 481  * recompute user-limit. 482  * 483  * @param userName 484  * Name of user who has submitted one/more app to given queue. 485  * @param clusterResource 486  * total cluster resource 487  * @param nodePartition 488  * partition name 489  * @param schedulingMode 490  * scheduling mode 491  * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY 492  * @return Computed User Limit 493  */ 494  public Resource getComputedResourceLimitForActiveUsers(String userName, 495  Resource clusterResource, String nodePartition, 496  SchedulingMode schedulingMode) { 497  498  Map<SchedulingMode, Resource> userLimitPerSchedulingMode; 499  500  writeLock.lock(); 501  try { 502  userLimitPerSchedulingMode = 503  preComputedActiveUserLimit.get(nodePartition); 504  if (isRecomputeNeeded(schedulingMode, nodePartition, true)) { 505  // recompute 506  userLimitPerSchedulingMode = reComputeUserLimits(userName, 507  nodePartition, clusterResource, schedulingMode, true); 508  509  // update user count to cache so that we can avoid recompute if no major 510  // changes. 511  setLocalVersionOfUsersState(nodePartition, schedulingMode, true); 512  } 513  } finally { 514  writeLock.unlock(); 515  } 516  517  Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode); 518  User user = getUser(userName); 519  float weight = (user == null) ? 1.0f : user.getWeight(); 520  Resource userSpecificUserLimit = 521  Resources.multiplyAndNormalizeDown(resourceCalculator, 522  userLimitResource, weight, lQueue.getMinimumAllocation()); 523  524  if (user != null) { 525  user.setUserResourceLimit(userSpecificUserLimit); 526  } 527  528  LOG.debug("userLimit is fetched. userLimit={}, userSpecificUserLimit={}," 529  + " schedulingMode={}, partition={}", userLimitResource, 530  userSpecificUserLimit, schedulingMode, nodePartition); 531  532  return userSpecificUserLimit; 533  } 534  535  /** 536  * Get computed user-limit for all users in this queue. If cached data is 537  * invalidated due to resource change, this method also enforce to recompute 538  * user-limit. 539  * 540  * @param userName 541  * Name of user who has submitted one/more app to given queue. 542  * @param clusterResource 543  * total cluster resource 544  * @param nodePartition 545  * partition name 546  * @param schedulingMode 547  * scheduling mode 548  * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY 549  * @return Computed User Limit 550  */ 551  public Resource getComputedResourceLimitForAllUsers(String userName, 552  Resource clusterResource, String nodePartition, 553  SchedulingMode schedulingMode) { 554  555  Map<SchedulingMode, Resource> userLimitPerSchedulingMode; 556  557  writeLock.lock(); 558  try { 559  userLimitPerSchedulingMode = preComputedAllUserLimit.get(nodePartition); 560  if (isRecomputeNeeded(schedulingMode, nodePartition, false)) { 561  // recompute 562  userLimitPerSchedulingMode = reComputeUserLimits(userName, 563  nodePartition, clusterResource, schedulingMode, false); 564  565  // update user count to cache so that we can avoid recompute if no major 566  // changes. 567  setLocalVersionOfUsersState(nodePartition, schedulingMode, false); 568  } 569  } finally { 570  writeLock.unlock(); 571  } 572  573  Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode); 574  User user = getUser(userName); 575  float weight = (user == null) ? 1.0f : user.getWeight(); 576  Resource userSpecificUserLimit = 577  Resources.multiplyAndNormalizeDown(resourceCalculator, 578  userLimitResource, weight, lQueue.getMinimumAllocation()); 579  580  LOG.debug("userLimit is fetched. userLimit={}, userSpecificUserLimit={}," 581  + " schedulingMode={}, partition={}", userLimitResource, 582  userSpecificUserLimit, schedulingMode, nodePartition); 583  584  return userSpecificUserLimit; 585  } 586  587  protected long getLatestVersionOfUsersState() { 588  readLock.lock(); 589  try { 590  return latestVersionOfUsersState; 591  } finally { 592  readLock.unlock(); 593  } 594  } 595  596  /* 597  * Recompute user-limit under following conditions: 1. cached user-limit does 598  * not exist in local map. 2. Total User count doesn't match with local cached 599  * version. 600  */ 601  private boolean isRecomputeNeeded(SchedulingMode schedulingMode, 602  String nodePartition, boolean isActive) { 603  readLock.lock(); 604  try { 605  return (getLocalVersionOfUsersState(nodePartition, schedulingMode, 606  isActive) != latestVersionOfUsersState); 607  } finally { 608  readLock.unlock(); 609  } 610  } 611  612  /* 613  * Set Local version of user count per label to invalidate cache if needed. 614  */ 615  private void setLocalVersionOfUsersState(String nodePartition, 616  SchedulingMode schedulingMode, boolean isActive) { 617  writeLock.lock(); 618  try { 619  Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive) 620  ? localVersionOfActiveUsersState 621  : localVersionOfAllUsersState; 622  623  Map<SchedulingMode, Long> localVersion = localVersionOfUsersState 624  .get(nodePartition); 625  if (null == localVersion) { 626  localVersion = new HashMap<SchedulingMode, Long>(); 627  localVersionOfUsersState.put(nodePartition, localVersion); 628  } 629  630  localVersion.put(schedulingMode, latestVersionOfUsersState); 631  } finally { 632  writeLock.unlock(); 633  } 634  } 635  636  /* 637  * Get Local version of user count per label to invalidate cache if needed. 638  */ 639  private long getLocalVersionOfUsersState(String nodePartition, 640  SchedulingMode schedulingMode, boolean isActive) { 641  this.readLock.lock(); 642  try { 643  Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive) 644  ? localVersionOfActiveUsersState 645  : localVersionOfAllUsersState; 646  647  if (!localVersionOfUsersState.containsKey(nodePartition)) { 648  return -1; 649  } 650  651  Map<SchedulingMode, Long> localVersion = localVersionOfUsersState 652  .get(nodePartition); 653  if (!localVersion.containsKey(schedulingMode)) { 654  return -1; 655  } 656  657  return localVersion.get(schedulingMode); 658  } finally { 659  readLock.unlock(); 660  } 661  } 662  663  private Map<SchedulingMode, Resource> reComputeUserLimits(String userName, 664  String nodePartition, Resource clusterResource, 665  SchedulingMode schedulingMode, boolean activeMode) { 666  667  // preselect stored map as per active user-limit or all user computation. 668  Map<String, Map<SchedulingMode, Resource>> computedMap = null; 669  computedMap = (activeMode) 670  ? preComputedActiveUserLimit 671  : preComputedAllUserLimit; 672  673  Map<SchedulingMode, Resource> userLimitPerSchedulingMode = computedMap 674  .get(nodePartition); 675  676  if (userLimitPerSchedulingMode == null) { 677  userLimitPerSchedulingMode = new ConcurrentHashMap<>(); 678  computedMap.put(nodePartition, userLimitPerSchedulingMode); 679  } 680  681  // compute user-limit per scheduling mode. 682  Resource computedUserLimit = computeUserLimit(userName, clusterResource, 683  nodePartition, schedulingMode, activeMode); 684  685  // update in local storage 686  userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit); 687  688  computeNumActiveUsersWithOnlyPendingApps(); 689  690  return userLimitPerSchedulingMode; 691  } 692  693  // This method is called within the lock. 694  private void computeNumActiveUsersWithOnlyPendingApps() { 695  int numPendingUsers = 0; 696  for (User user : users.values()) { 697  if ((user.getPendingApplications() > 0) 698  && (user.getActiveApplications() <= 0)) { 699  numPendingUsers++; 700  } 701  } 702  activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers); 703  } 704  705  @VisibleForTesting 706  Resource computeUserLimit(String userName, Resource clusterResource, 707  String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { 708  Resource partitionResource = labelManager.getResourceByLabel(nodePartition, 709  clusterResource); 710  711  /* 712  * What is our current capacity? 713  * * It is equal to the max(required, queue-capacity) if we're running 714  * below capacity. The 'max' ensures that jobs in queues with miniscule 715  * capacity (< 1 slot) make progress 716  * * If we're running over capacity, then its (usedResources + required) 717  * (which extra resources we are allocating) 718  */ 719  Resource queueCapacity = lQueue.getEffectiveCapacity(nodePartition); 720  Resource originalCapacity = queueCapacity; 721  722  /* 723  * Assume we have required resource equals to minimumAllocation, this can 724  * make sure user limit can continuously increase till queueMaxResource 725  * reached. 726  */ 727  Resource required = lQueue.getMinimumAllocation(); 728  729  // Allow progress for queues with miniscule capacity 730  queueCapacity = Resources.max(resourceCalculator, partitionResource, 731  queueCapacity, required); 732  733  /* 734  * We want to base the userLimit calculation on 735  * max(queueCapacity, usedResources+required). However, we want 736  * usedResources to be based on the combined ratios of all the users in the 737  * queue so we use consumedRatio to calculate such. 738  * The calculation is dependent on how the resourceCalculator calculates the 739  * ratio between two Resources. DRF Example: If usedResources is greater 740  * than queueCapacity and users have the following [mem,cpu] usages: 741  * 742  * User1: [10%,20%] - Dominant resource is 20% 743  * User2: [30%,10%] - Dominant resource is 30% 744  * Then total consumedRatio is then 20+30=50%. Yes, this value can be 745  * larger than 100% but for the purposes of making sure all users are 746  * getting their fair share, it works. 747  */ 748  Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, 749  partitionResource, getUsageRatio(nodePartition), 750  lQueue.getMinimumAllocation()); 751  Resource currentCapacity = Resources.lessThan(resourceCalculator, 752  partitionResource, consumed, queueCapacity) 753  ? queueCapacity 754  : Resources.add(consumed, required); 755  756  /* 757  * Never allow a single user to take more than the queue's configured 758  * capacity * user-limit-factor. Also, the queue's configured capacity 759  * should be higher than queue-hard-limit * ulMin 760  */ 761  float usersSummedByWeight = activeUsersTimesWeights; 762  Resource resourceUsed = Resources.add( 763  totalResUsageForActiveUsers.getUsed(nodePartition), 764  required); 765  766  // For non-activeUser calculation, consider all users count. 767  if (!activeUser) { 768  resourceUsed = currentCapacity; 769  usersSummedByWeight = allUsersTimesWeights; 770  } 771  772  /* 773  * User limit resource is determined by: max(currentCapacity / #activeUsers, 774  * currentCapacity * user-limit-percentage%) 775  */ 776  Resource userLimitResource = Resources.max(resourceCalculator, 777  partitionResource, 778  Resources.divideAndCeil(resourceCalculator, resourceUsed, 779  usersSummedByWeight), 780  Resources.divideAndCeil(resourceCalculator, 781  Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()), 782  100)); 783  784  // User limit is capped by maxUserLimit 785  // - maxUserLimit = queueCapacity * user-limit-factor 786  // (RESPECT_PARTITION_EXCLUSIVITY) 787  // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) 788  // 789  // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a 790  // partition, its guaranteed resource on that partition is 0. And 791  // user-limit-factor computation is based on queue's guaranteed capacity. So 792  // we will not cap user-limit as well as used resource when doing 793  // IGNORE_PARTITION_EXCLUSIVITY allocation. 794  Resource maxUserLimit = Resources.none(); 795  if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { 796  if (getUserLimitFactor() == -1 || 797  originalCapacity.equals(Resources.none())) { 798  // If user-limit-factor set to -1, we should disable user limit. 799  // 800  // Also prevent incorrect maxUserLimit due to low queueCapacity 801  // Can happen if dynamic queue has capacity = 0% 802  maxUserLimit = lQueue. 803  getEffectiveMaxCapacityDown( 804  nodePartition, lQueue.getMinimumAllocation()); 805  } else { 806  maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, 807  getUserLimitFactor()); 808  } 809  } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { 810  maxUserLimit = partitionResource; 811  } 812  813  // Cap final user limit with maxUserLimit 814  userLimitResource = Resources 815  .roundUp(resourceCalculator, 816  Resources.min(resourceCalculator, partitionResource, 817  userLimitResource, maxUserLimit), 818  lQueue.getMinimumAllocation()); 819  820  if (LOG.isDebugEnabled()) { 821  LOG.debug("User limit computation for " + userName 822  + ", in queue: " + lQueue.getQueuePath() 823  + ", userLimitPercent=" + lQueue.getUserLimit() 824  + ", userLimitFactor=" + lQueue.getUserLimitFactor() 825  + ", required=" + required 826  + ", consumed=" + consumed 827  + ", user-limit-resource=" + userLimitResource 828  + ", queueCapacity=" + queueCapacity 829  + ", qconsumed=" + lQueue.getQueueResourceUsage().getUsed() 830  + ", currentCapacity=" + currentCapacity 831  + ", activeUsers=" + usersSummedByWeight 832  + ", clusterCapacity=" + clusterResource 833  + ", resourceByLabel=" + partitionResource 834  + ", usageratio=" + getUsageRatio(nodePartition) 835  + ", Partition=" + nodePartition 836  + ", resourceUsed=" + resourceUsed 837  + ", maxUserLimit=" + maxUserLimit 838  + ", userWeight=" + getUser(userName).getWeight() 839  ); 840  } 841  return userLimitResource; 842  } 843  844  /** 845  * Update new usage ratio. 846  * 847  * @param partition 848  * Node partition 849  * @param clusterResource 850  * Cluster Resource 851  */ 852  public void updateUsageRatio(String partition, Resource clusterResource) { 853  writeLock.lock(); 854  try { 855  Resource resourceByLabel = labelManager.getResourceByLabel(partition, 856  clusterResource); 857  float consumed = 0; 858  User user; 859  for (Map.Entry<String, User> entry : getUsers().entrySet()) { 860  user = entry.getValue(); 861  consumed += user.setAndUpdateUsageRatio(resourceCalculator, 862  resourceByLabel, partition); 863  } 864  865  qUsageRatios.setUsageRatio(partition, consumed); 866  } finally { 867  writeLock.unlock(); 868  } 869  } 870  871  /* 872  * Increment Queue Usage Ratio. 873  */ 874  private void incQueueUsageRatio(String nodePartition, float delta) { 875  qUsageRatios.incUsageRatio(nodePartition, delta); 876  } 877  878  @Override 879  public void activateApplication(String user, ApplicationId applicationId) { 880  881  this.writeLock.lock(); 882  try { 883  User userDesc = getUser(user); 884  if (userDesc != null && userDesc.getActiveApplications() <= 0) { 885  return; 886  } 887  888  Set<ApplicationId> userApps = usersApplications.get(user); 889  if (userApps == null) { 890  userApps = new HashSet<ApplicationId>(); 891  usersApplications.put(user, userApps); 892  activeUsers.incrementAndGet(); 893  metrics.incrActiveUsers(); 894  895  // A user is added to active list. Invalidate user-limit cache. 896  userLimitNeedsRecompute(); 897  updateActiveUsersResourceUsage(user); 898  LOG.debug("User {} added to activeUsers, currently: {}", 899  user, activeUsers); 900  } 901  if (userApps.add(applicationId)) { 902  metrics.activateApp(user); 903  } 904  } finally { 905  this.writeLock.unlock(); 906  } 907  } 908  909  @Override 910  public void deactivateApplication(String user, ApplicationId applicationId) { 911  912  this.writeLock.lock(); 913  try { 914  Set<ApplicationId> userApps = usersApplications.get(user); 915  if (userApps != null) { 916  if (userApps.remove(applicationId)) { 917  metrics.deactivateApp(user); 918  } 919  if (userApps.isEmpty()) { 920  usersApplications.remove(user); 921  activeUsers.decrementAndGet(); 922  metrics.decrActiveUsers(); 923  924  // A user is removed from active list. Invalidate user-limit cache. 925  userLimitNeedsRecompute(); 926  updateNonActiveUsersResourceUsage(user); 927  LOG.debug("User {} removed from activeUsers, currently: {}", 928  user, activeUsers); 929  } 930  } 931  } finally { 932  this.writeLock.unlock(); 933  } 934  } 935  936  @Override 937  public int getNumActiveUsers() { 938  return activeUsers.get() + activeUsersWithOnlyPendingApps.get(); 939  } 940  941  float sumActiveUsersTimesWeights() { 942  float count = 0.0f; 943  this.readLock.lock(); 944  try { 945  for (String u : activeUsersSet) { 946  count += getUser(u).getWeight(); 947  } 948  return count; 949  } finally { 950  this.readLock.unlock(); 951  } 952  } 953  954  float sumAllUsersTimesWeights() { 955  float count = 0.0f; 956  this.readLock.lock(); 957  try { 958  for (String u : users.keySet()) { 959  count += getUser(u).getWeight(); 960  } 961  return count; 962  } finally { 963  this.readLock.unlock(); 964  } 965  } 966  967  private void updateActiveUsersResourceUsage(String userName) { 968  this.writeLock.lock(); 969  try { 970  // For UT case: We might need to add the user to users list. 971  User user = getUserAndAddIfAbsent(userName); 972  ResourceUsage resourceUsage = user.getResourceUsage(); 973  // If User is moved to active list, moved resource usage from non-active 974  // to active list. 975  if (nonActiveUsersSet.contains(userName)) { 976  nonActiveUsersSet.remove(userName); 977  activeUsersSet.add(userName); 978  activeUsersTimesWeights = sumActiveUsersTimesWeights(); 979  980  // Update total resource usage of active and non-active after user 981  // is moved from non-active to active. 982  for (String partition : resourceUsage.getNodePartitionsSet()) { 983  totalResUsageForNonActiveUsers.decUsed(partition, 984  resourceUsage.getUsed(partition)); 985  totalResUsageForActiveUsers.incUsed(partition, 986  resourceUsage.getUsed(partition)); 987  } 988  989  if (LOG.isDebugEnabled()) { 990  LOG.debug("User '" + userName 991  + "' has become active. Hence move user to active list." 992  + "Active users size = " + activeUsersSet.size() 993  + "Non-active users size = " + nonActiveUsersSet.size() 994  + "Total Resource usage for active users=" 995  + totalResUsageForActiveUsers.getAllUsed() + "." 996  + "Total Resource usage for non-active users=" 997  + totalResUsageForNonActiveUsers.getAllUsed()); 998  } 999  } 1000  } finally { 1001  this.writeLock.unlock(); 1002  } 1003  } 1004  1005  private void updateNonActiveUsersResourceUsage(String userName) { 1006  this.writeLock.lock(); 1007  try { 1008  1009  // For UT case: We might need to add the user to users list. 1010  User user = getUser(userName); 1011  if (user == null) return; 1012  1013  ResourceUsage resourceUsage = user.getResourceUsage(); 1014  // If User is moved to non-active list, moved resource usage from 1015  // non-active to active list. 1016  if (activeUsersSet.contains(userName)) { 1017  activeUsersSet.remove(userName); 1018  nonActiveUsersSet.add(userName); 1019  activeUsersTimesWeights = sumActiveUsersTimesWeights(); 1020  1021  // Update total resource usage of active and non-active after user is 1022  // moved from active to non-active. 1023  for (String partition : resourceUsage.getNodePartitionsSet()) { 1024  totalResUsageForActiveUsers.decUsed(partition, 1025  resourceUsage.getUsed(partition)); 1026  totalResUsageForNonActiveUsers.incUsed(partition, 1027  resourceUsage.getUsed(partition)); 1028  1029  if (LOG.isDebugEnabled()) { 1030  LOG.debug("User '" + userName 1031  + "' has become non-active.Hence move user to non-active list." 1032  + "Active users size = " + activeUsersSet.size() 1033  + "Non-active users size = " + nonActiveUsersSet.size() 1034  + "Total Resource usage for active users=" 1035  + totalResUsageForActiveUsers.getAllUsed() + "." 1036  + "Total Resource usage for non-active users=" 1037  + totalResUsageForNonActiveUsers.getAllUsed()); 1038  } 1039  } 1040  } 1041  } finally { 1042  this.writeLock.unlock(); 1043  } 1044  } 1045  1046  private ResourceUsage getTotalResourceUsagePerUser(String userName) { 1047  if (nonActiveUsersSet.contains(userName)) { 1048  return totalResUsageForNonActiveUsers; 1049  } else if (activeUsersSet.contains(userName)) { 1050  return totalResUsageForActiveUsers; 1051  } else { 1052  LOG.warn("User '" + userName 1053  + "' is not present in active/non-active. This is highly unlikely." 1054  + "We can consider this user in non-active list in this case."); 1055  return totalResUsageForNonActiveUsers; 1056  } 1057  } 1058  1059  /** 1060  * During container allocate/release, ensure that all user specific data 1061  * structures are updated. 1062  * 1063  * @param userName 1064  * Name of the user 1065  * @param resource 1066  * Resource to increment/decrement 1067  * @param nodePartition 1068  * Node label 1069  * @param isAllocate 1070  * Indicate whether to allocate or release resource 1071  * @return user 1072  */ 1073  public User updateUserResourceUsage(String userName, Resource resource, 1074  String nodePartition, boolean isAllocate) { 1075  this.writeLock.lock(); 1076  try { 1077  1078  // TODO, should use getUser, use this method just to avoid UT failure 1079  // which is caused by wrong invoking order, will fix UT separately 1080  User user = getUserAndAddIfAbsent(userName); 1081  1082  // New container is allocated. Invalidate user-limit. 1083  updateResourceUsagePerUser(user, resource, nodePartition, isAllocate); 1084  1085  userLimitNeedsRecompute(); 1086  1087  // Update usage ratios 1088  Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, 1089  scheduler.getClusterResource()); 1090  incQueueUsageRatio(nodePartition, user.updateUsageRatio( 1091  resourceCalculator, resourceByLabel, nodePartition)); 1092  1093  return user; 1094  } finally { 1095  this.writeLock.unlock(); 1096  } 1097  } 1098  1099  private void updateResourceUsagePerUser(User user, Resource resource, 1100  String nodePartition, boolean isAllocate) { 1101  ResourceUsage totalResourceUsageForUsers = getTotalResourceUsagePerUser( 1102  user.userName); 1103  1104  if (isAllocate) { 1105  user.getResourceUsage().incUsed(nodePartition, resource); 1106  totalResourceUsageForUsers.incUsed(nodePartition, resource); 1107  } else { 1108  user.getResourceUsage().decUsed(nodePartition, resource); 1109  totalResourceUsageForUsers.decUsed(nodePartition, resource); 1110  } 1111  1112  if (LOG.isDebugEnabled()) { 1113  LOG.debug( 1114  "User resource is updated." + "Total Resource usage for active users=" 1115  + totalResUsageForActiveUsers.getAllUsed() + "." 1116  + "Total Resource usage for non-active users=" 1117  + totalResUsageForNonActiveUsers.getAllUsed()); 1118  } 1119  } 1120  1121  public void updateUserWeights() { 1122  this.writeLock.lock(); 1123  try { 1124  for (Map.Entry<String, User> ue : users.entrySet()) { 1125  ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey())); 1126  } 1127  activeUsersTimesWeights = sumActiveUsersTimesWeights(); 1128  allUsersTimesWeights = sumAllUsersTimesWeights(); 1129  userLimitNeedsRecompute(); 1130  } finally { 1131  this.writeLock.unlock(); 1132  } 1133  } 1134  1135  @VisibleForTesting 1136  public int getNumActiveUsersWithOnlyPendingApps() { 1137  return activeUsersWithOnlyPendingApps.get(); 1138  } 1139  1140  @VisibleForTesting 1141  void setUsageRatio(String label, float usage) { 1142  qUsageRatios.usageRatios.put(label, usage); 1143  } 1144 }