diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 7bd2616..35ebf2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -111,6 +111,7 @@ private ReservationQueueConfiguration globalReservationQueueConfig; private final Set nonPreemptableQueues; + private final double nodeBalanceThreshold; public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, @@ -131,7 +132,7 @@ public AllocationConfiguration(Map minQueueResources, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, Set reservableQueues, - Set nonPreemptableQueues) { + Set nonPreemptableQueues, double nodeBalanceThreshold) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.maxChildQueueResources = maxChildQueueResources; @@ -155,6 +156,7 @@ public AllocationConfiguration(Map minQueueResources, this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; this.nonPreemptableQueues = nonPreemptableQueues; + this.nodeBalanceThreshold = nodeBalanceThreshold; } public AllocationConfiguration(Configuration conf) { @@ -184,6 +186,7 @@ public AllocationConfiguration(Configuration conf) { placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); nonPreemptableQueues = new HashSet<>(); + nodeBalanceThreshold = 0; } /** @@ -434,4 +437,8 @@ public void initFSQueue(FSQueue queue, FairScheduler scheduler){ queue.getMetrics().setMaxApps(getQueueMaxApps(name)); queue.getMetrics().setSchedulingPolicy(getSchedulingPolicy(name).getName()); } + + public double getNodeBalanceThreshold() { + return nodeBalanceThreshold; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index cd4a19b..72d679f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -259,6 +259,7 @@ public synchronized void reloadAllocations() throws IOException, String reservationAdmissionPolicy = null; QueuePlacementPolicy newPlacementPolicy = null; + double nodeBalanceThreshold = 0.0; // Remember all queue names so we can display them on web UI, etc. // configuredQueues is segregated based on whether it is a leaf queue @@ -364,6 +365,9 @@ public synchronized void reloadAllocations() throws IOException, } else if ("reservation-policy".equals(element.getTagName())) { String text = ((Text) element.getFirstChild()).getData().trim(); reservationAdmissionPolicy = text; + } else if ("nodeBalanceThreshold".equals(element.getTagName())) { + String text = ((Text) element.getFirstChild()).getData().trim(); + nodeBalanceThreshold = Double.valueOf(text); } else { LOG.warn("Bad element in allocations file: " + element.getTagName()); } @@ -436,7 +440,8 @@ public synchronized void reloadAllocations() throws IOException, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, newPlacementPolicy, configuredQueues, - globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); + globalReservationQueueConfig, reservableQueues, nonPreemptableQueues, + nodeBalanceThreshold); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 0715e3a..eb5601d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; @@ -109,6 +110,8 @@ private final Map allowedLocalityLevel = new HashMap<>(); + protected long nodeBalanceOpportunity = 0; + public FSAppAttempt(FairScheduler scheduler, ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { @@ -773,6 +776,8 @@ private Resource assignContainer( FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, boolean reserved, SchedulerRequestKey schedulerKey) { + nodeBalanceOpportunity = 0; + // How much does this request need? Resource capability = pendingAsk.getPerAllocationResource(); @@ -863,6 +868,25 @@ private boolean isOverAMShareLimit() { return false; } + private boolean isImbalanceAllowed(long opportunity, int numNodes, double threshold) { + if (opportunity >= (numNodes * threshold)) { + return true; + } + return false; + } + + private boolean checkBalance(FSSchedulerNode node, PendingAsk request, boolean allowImbalance) { + if (allowImbalance) { + return true; + } + ResourceType type = scheduler.determineNodeDominantResourceType(request.getPerAllocationResource(), node.getTotalResource()); + if (type != node.getNodeDominantResourceType()) { + // dominant resource among currently used is different, the allocation will help balance + return true; + } + return false; + } + private Resource assignContainer(FSSchedulerNode node, boolean reserved) { if (LOG.isTraceEnabled()) { LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved); @@ -917,8 +941,12 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { scheduler.getRackLocalityThreshold()); } + boolean allowImbalance = reserved || isImbalanceAllowed(nodeBalanceOpportunity, scheduler.getNumClusterNodes(), + scheduler.allocConf.getNodeBalanceThreshold()); + if (rackLocalPendingAsk.getCount() > 0 - && nodeLocalPendingAsk.getCount() > 0) { + && nodeLocalPendingAsk.getCount() > 0 + && checkBalance(node, nodeLocalPendingAsk, allowImbalance)) { if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() + " node, assignType: NODE_LOCAL" + ", allowedLocality: " @@ -935,7 +963,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { if (rackLocalPendingAsk.getCount() > 0 && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality - .equals(NodeType.OFF_SWITCH))) { + .equals(NodeType.OFF_SWITCH)) + && checkBalance(node, rackLocalPendingAsk, allowImbalance)) { if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() + " node, assignType: RACK_LOCAL" + ", allowedLocality: " @@ -952,7 +981,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { continue; } - if (offswitchAsk.getCount() > 0) { + if (offswitchAsk.getCount() > 0 + && checkBalance(node, offswitchAsk, allowImbalance)) { if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks() <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) { if (LOG.isTraceEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index d983ea0..fb9b86d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -43,6 +44,8 @@ private final Set containersForPreemption = new ConcurrentSkipListSet<>(); + private ResourceType nodeDominantResourceType; + public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); } @@ -137,4 +140,12 @@ void addContainersForPreemption(Collection containers) { void removeContainerForPreemption(RMContainer container) { containersForPreemption.remove(container); } + + public ResourceType getNodeDominantResourceType() { + return nodeDominantResourceType; + } + + public void setNodeDominantResourceType(ResourceType nodeDominantResourceType) { + this.nodeDominantResourceType = nodeDominantResourceType; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 97871e7..d1ce40a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -996,6 +997,9 @@ void attemptScheduling(FSSchedulerNode node) { node.getUnallocatedResource(), 0.5f); while (node.getReservedContainer() == null) { boolean assignedContainer = false; + // update dominant resource before scheduling attempts + node.setNodeDominantResourceType(determineNodeDominantResourceType(node.getAllocatedResource(), + node.getTotalResource())); Resource assignment = queueMgr.getRootQueue().assignContainer(node); if (!assignment.equals(Resources.none())) { assignedContainers++; @@ -1778,4 +1782,15 @@ protected void decreaseContainer( public float getReservableNodesRatio() { return reservableNodesRatio; } + + ResourceType determineNodeDominantResourceType(Resource used, Resource total) { + double memoryRatio = ((double)used.getMemory()) / total.getMemory(); + double vcoresRatio = ((double)used.getVirtualCores()) / total.getVirtualCores(); + if (memoryRatio > vcoresRatio) { + return ResourceType.MEMORY; + } else { + return ResourceType.CPU; + } + } + }