commit b8d97652317708551d64ff29a336392c27405564 Author: Wangda Tan Date: Tue May 24 00:19:55 2016 -0700 wip.. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index dc90c5b..3acd362 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,6 +53,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -687,4 +691,14 @@ public Resource getTotalKillableResource(String partition) { return csContext.getPreemptionManager().getKillableContainers(queueName, partition); } + + // Only for testing + @VisibleForTesting + public synchronized CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + SchedulingMode schedulingMode) { + return assignContainers(clusterResource, + new NodeCandidates(node, Arrays.asList(node), node.getPartition()), + currentResourceLimits, schedulingMode); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index daf7790..485d93d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.security.AccessControlException; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -195,13 +197,18 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, /** * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. - * @param node node on which resources are available - * @param resourceLimits how much overall resource of this queue can use. - * @param schedulingMode Type of exclusive check when assign container on a + * @param nodeCandidatesFilter node on which resources are available + * @param resourceLimits how much overall resource of this queue can use. + * @param schedulingMode Type of exclusive check when assign container on a * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, + NodeCandidates nodeCandidatesFilter, ResourceLimits resourceLimits, + SchedulingMode schedulingMode); + + @VisibleForTesting + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 920e983..4268703 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -1245,7 +1246,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { assignment = queue.assignContainers( getClusterResource(), - node, + new NodeCandidates(node, null, node.getPartition()), // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( @@ -1278,7 +1279,8 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { assignment = root.assignContainers( getClusterResource(), - node, + new NodeCandidates<>(node, getAllNodes(), + node.getPartition()), new ResourceLimits(labelManager.getResourceByLabel( node.getPartition(), getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1309,7 +1311,8 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Try to use NON_EXCLUSIVE assignment = root.assignContainers( getClusterResource(), - node, + new NodeCandidates<>(node, getAllNodes(), + node.getPartition()), // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..fdd9cac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1077,4 +1077,8 @@ public boolean getLazyPreemptionEnabled() { PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = false; + + public static final String ENABLE_GLOBAL_SCHEDULING = + PREFIX + "global-scheduling-enabled"; + public static final boolean DEFAULT_GLOBAL_SCHEDULING_ENABLED = false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6dcafec..9589168 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -29,6 +30,7 @@ import java.util.Set; import java.util.TreeSet; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; @@ -807,9 +810,12 @@ private synchronized FiCaSchedulerApp getApplication( } private void handleExcessReservedContainer(Resource clusterResource, - CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) { + CSAssignment assignment) { if (assignment.getExcessReservation() != null) { RMContainer excessReservedContainer = assignment.getExcessReservation(); + FiCaSchedulerNode node = csContext.getNode( + excessReservedContainer.getReservedNode()); + FiCaSchedulerApp app = assignment.getApplication(); if (excessReservedContainer.hasIncreaseReservation()) { unreserveIncreasedContainer(clusterResource, @@ -829,7 +835,6 @@ private void handleExcessReservedContainer(Resource clusterResource, } private void killToPreemptContainers(Resource clusterResource, - FiCaSchedulerNode node, CSAssignment assignment) { if (assignment.getContainersToKill() != null) { StringBuilder sb = new StringBuilder("Killing containers: ["); @@ -838,6 +843,7 @@ private void killToPreemptContainers(Resource clusterResource, FiCaSchedulerApp application = csContext.getApplicationAttempt( c.getApplicationAttemptId()); LeafQueue q = application.getCSLeafQueue(); + FiCaSchedulerNode node = csContext.getNode(c.getAllocatedNode()); q.completedContainer(clusterResource, application, node, c, SchedulerUtils .createPreemptedContainerStatus(c.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, @@ -861,50 +867,64 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } + + private synchronized CSAssignment handleReservedContainer( + Resource clusterResource, NodeCandidates nodeCandidatesFilter, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { + RMContainer reservedContainer = + nodeCandidatesFilter.getNextAvailable().getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp application = + getApplication(reservedContainer.getApplicationAttemptId()); + synchronized (application) { + CSAssignment assignment = + application.assignContainers(clusterResource, nodeCandidatesFilter, + currentResourceLimits, schedulingMode, reservedContainer); + handleExcessReservedContainer(clusterResource, assignment); + killToPreemptContainers(clusterResource, assignment); + return assignment; + } + } + + return null; + } @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + NodeCandidates nodeCandidatesFilter, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + orderingPolicy.getNumSchedulableEntities()); + LOG.debug("assignContainers: nodePartition=" + nodeCandidatesFilter + .getPartition() + " #applications=" + orderingPolicy + .getNumSchedulableEntities()); } - setPreemptionAllowed(currentResourceLimits, node.getPartition()); + setPreemptionAllowed(currentResourceLimits, + nodeCandidatesFilter.getPartition()); // Check for reserved resources - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp application = - getApplication(reservedContainer.getApplicationAttemptId()); - synchronized (application) { - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, reservedContainer); - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); - return assignment; - } + CSAssignment assignment = handleReservedContainer(clusterResource, + nodeCandidatesFilter, currentResourceLimits, schedulingMode); + if (null != assignment) { + return assignment; } // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { + && !accessibleToPartition(nodeCandidatesFilter.getPartition())) { return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!hasPendingResourceRequest(node.getPartition(), clusterResource, - schedulingMode)) { + if (!hasPendingResourceRequest(nodeCandidatesFilter.getPartition(), + clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); + + schedulingMode.name() + " node-partition=" + nodeCandidatesFilter.getPartition()); } return CSAssignment.NULL_ASSIGNMENT; } @@ -914,7 +934,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerApp application = assignmentIterator.next(); // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, nodeCandidatesFilter.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { return CSAssignment.NULL_ASSIGNMENT; @@ -922,20 +942,19 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - node.getPartition(), schedulingMode); + nodeCandidatesFilter.getPartition(), schedulingMode); // Check user limit if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { + application, nodeCandidatesFilter.getPartition(), currentResourceLimits)) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); continue; } // Try to schedule - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, null); + assignment = application.assignContainers(clusterResource, + nodeCandidatesFilter, currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " @@ -946,9 +965,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Did we schedule or reserve a container? Resource assigned = assignment.getResource(); - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); + handleExcessReservedContainer(clusterResource, assignment); + killToPreemptContainers(clusterResource, assignment); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -960,20 +978,21 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer, + nodeCandidatesFilter.getPartition(), reservedOrAllocatedRMContainer, assignment.isIncreasedAllocation()); // Update reserved metrics Resource reservedRes = assignment.getAssignmentInformation() .getReserved(); if (reservedRes != null && !reservedRes.equals(Resources.none())) { - incReservedResource(node.getPartition(), reservedRes); + incReservedResource(nodeCandidatesFilter.getPartition(), reservedRes); } // Done return assignment; } else if (assignment.getSkipped()) { - application.updateNodeInfoForAMDiagnostics(node); + application.updateNodeInfoForAMDiagnostics( + (FiCaSchedulerNode) nodeCandidatesFilter.getNextAvailable()); } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..128f600 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -383,35 +383,37 @@ private synchronized void removeApplication(ApplicationId applicationId, @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits, + NodeCandidates nodeCandidatesFilter, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + String partition = nodeCandidatesFilter.getPartition(); + // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { + && !accessibleToPartition(nodeCandidatesFilter.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + node - .getPartition()); + + ", because it is not able to access partition=" + partition); } return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(node.getPartition(), - clusterResource, schedulingMode)) { + if (!super.hasPendingResourceRequest(partition, clusterResource, + schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); + + schedulingMode.name() + " node-partition=" + partition); } return CSAssignment.NULL_ASSIGNMENT; } CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - while (canAssign(clusterResource, node)) { + + while (canAssign(clusterResource, + (FiCaSchedulerNode) nodeCandidatesFilter.getNextAvailable())) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " + getQueueName()); @@ -420,7 +422,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, partition, resourceLimits, Resources.createResource( getMetrics().getReservedMB(), getMetrics() .getReservedVirtualCores()), schedulingMode)) { @@ -429,7 +431,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, resourceLimits, + assignContainersToChildQueues(clusterResource, nodeCandidatesFilter, resourceLimits, schedulingMode); assignment.setType(assignedToChild.getType()); @@ -439,7 +441,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue allocateResource(clusterResource, assignedToChild.getResource(), - node.getPartition(), assignedToChild.isIncreasedAllocation()); + partition, assignedToChild.isIncreasedAllocation()); // Track resource utilization in this pass of the scheduler Resources @@ -500,6 +502,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return assignment; } + // FIXME: + // Only check next-node.available resource only at root queue, and check queue's + // available resource for partition private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { // Two conditions need to meet when trying to allocate: // 1) Node doesn't have reserved container @@ -546,8 +551,8 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, return new ResourceLimits(childLimit); } - private Iterator sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) { - if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + private Iterator sortAndGetChildrenAllocationIterator(String partition) { + if (partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (needToResortQueuesAtNextAllocation) { // If we skipped resort queues last time, we need to re-sort queue // before allocation @@ -559,23 +564,25 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, return childQueues.iterator(); } - partitionQueueComparator.setPartitionToLookAt(node.getPartition()); + partitionQueueComparator.setPartitionToLookAt(partition); List childrenList = new ArrayList<>(childQueues); Collections.sort(childrenList, partitionQueueComparator); return childrenList.iterator(); } - + private synchronized CSAssignment assignContainersToChildQueues( - Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, - SchedulingMode schedulingMode) { + Resource cluster, NodeCandidates nodeCandidatesFilter, + ResourceLimits limits, SchedulingMode schedulingMode) { + String partition = nodeCandidatesFilter.getPartition(); + CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); printChildQueues(); // Try to assign to most 'under-served' sub-queue - for (Iterator iter = sortAndGetChildrenAllocationIterator(node); iter - .hasNext();) { + for (Iterator iter = sortAndGetChildrenAllocationIterator( + partition); iter.hasNext(); ) { CSQueue childQueue = iter.next(); if(LOG.isDebugEnabled()) { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() @@ -584,9 +591,9 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); + getResourceLimitsOfChild(childQueue, cluster, limits, partition); - assignment = childQueue.assignContainers(cluster, node, + assignment = childQueue.assignContainers(cluster, nodeCandidatesFilter, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + @@ -600,7 +607,7 @@ private synchronized CSAssignment assignContainersToChildQueues( assignment.getResource(), Resources.none())) { // Only update childQueues when we doing non-partitioned node // allocation. - if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { + if (RMNodeLabelsManager.NO_LABEL.equals(partition)) { // Remove and re-insert to sort iter.remove(); LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..4c42f5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -128,6 +128,6 @@ protected CSAssignment getCSAssignmentFromAllocateResult( * */ public abstract CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + NodeCandidates nodeCandidatesFilter, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 8f749f6..0a9ab7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.List; @@ -60,6 +62,7 @@ NodeType requestNodeType = NodeType.NODE_LOCAL; Container updatedContainer; private List toKillContainers; + FiCaSchedulerNode nodeToAllocate; public ContainerAllocation(RMContainer containerToBeUnreserved, Resource resourceToBeAllocated, AllocationState state) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 3be8e0e..9fc572a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -25,8 +25,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -46,17 +46,17 @@ public ContainerAllocator(FiCaSchedulerApp application, @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + NodeCandidates nodeCandidatesFilter, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { if (reservedContainer != null) { if (reservedContainer.getState() == RMContainerState.RESERVED) { // It's a regular container return regularContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + nodeCandidatesFilter, schedulingMode, resourceLimits, reservedContainer); } else { // It's a increase container return increaseContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + nodeCandidatesFilter, schedulingMode, resourceLimits, reservedContainer); } } else { /* @@ -64,14 +64,16 @@ public CSAssignment assignContainers(Resource clusterResource, * anything, we will try to allocate regular container */ CSAssignment assign = - increaseContainerAllocator.assignContainers(clusterResource, node, + increaseContainerAllocator.assignContainers(clusterResource, + nodeCandidatesFilter, schedulingMode, resourceLimits, null); if (Resources.greaterThan(rc, clusterResource, assign.getResource(), Resources.none())) { return assign; } - return regularContainerAllocator.assignContainers(clusterResource, node, + return regularContainerAllocator.assignContainers(clusterResource, + nodeCandidatesFilter, schedulingMode, resourceLimits, null); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 25e5824..a810dca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -172,17 +173,17 @@ private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node, @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + NodeCandidates nodeCandidatesFilter, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { AppSchedulingInfo sinfo = application.getAppSchedulingInfo(); - NodeId nodeId = node.getNodeID(); + NodeId nodeId = nodeCandidatesFilter.getNextAvailable().getNodeID(); if (reservedContainer == null) { // Do we have increase request on this node? if (!sinfo.hasIncreaseRequest(nodeId)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip allocating increase request since we don't have any" - + " increase request on this node=" + node.getNodeID()); + + " increase request on this node=" + nodeId); } return CSAssignment.SKIP_ASSIGNMENT; @@ -291,7 +292,8 @@ public CSAssignment assignContainers(Resource clusterResource, } if (!Resources.fitsIn(rc, clusterResource, - increaseRequest.getTargetCapacity(), node.getTotalResource())) { + increaseRequest.getTargetCapacity(), + nodeCandidatesFilter.getNextAvailable().getTotalResource())) { // if the target capacity is more than what the node can offer, we // will simply remove and skip it. // The reason of doing check here instead of adding increase request @@ -299,15 +301,18 @@ public CSAssignment assignContainers(Resource clusterResource, // request added. if (LOG.isDebugEnabled()) { LOG.debug(" Target capacity is more than what node can offer," - + " node.resource=" + node.getTotalResource()); + + " node.resource=" + nodeCandidatesFilter.getNextAvailable() + .getTotalResource()); } toBeRemovedRequests.add(increaseRequest); continue; } // Try to allocate the increase request - assigned = - allocateIncreaseRequest(node, clusterResource, increaseRequest); + assigned = allocateIncreaseRequest( + (FiCaSchedulerNode) nodeCandidatesFilter.getNextAvailable(), + clusterResource, + increaseRequest); if (!assigned.getSkipped()) { // When we don't skip this request, which means we either allocated // OR reserved this request. We will break @@ -357,9 +362,9 @@ public CSAssignment assignContainers(Resource clusterResource, // We don't need this container now, just return excessive reservation return new CSAssignment(application, reservedContainer); } - - return allocateIncreaseRequestFromReservedContainer(node, clusterResource, - request); + + return allocateIncreaseRequestFromReservedContainer( + nodeCandidatesFilter.getNextAvailable(), clusterResource, request); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index b2d4bbe..604ae6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; +import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; @@ -32,17 +33,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer.SchedulerNodesScorer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer.SchedulerNodesScorerCache; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; /** @@ -50,45 +58,39 @@ * delayed scheduling mechanism to get better locality allocation. */ public class RegularContainerAllocator extends AbstractContainerAllocator { - private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); - + private static final Log LOG = LogFactory.getLog( + RegularContainerAllocator.class); + private ResourceRequest lastResourceRequest = null; - + public RegularContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { super(application, rc, rmContext); } - - private boolean checkHeadroom(Resource clusterResource, + + private boolean checkHeadroomForPartition(Resource clusterResource, ResourceLimits currentResourceLimits, Resource required, - FiCaSchedulerNode node) { + String partition) { // If headroom + currentReservation < required, we cannot allocate this // require Resource resourceCouldBeUnReserved = application.getCurrentReservation(); if (!application.getCSLeafQueue().getReservationContinueLooking() - || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + || partition.equals(RMNodeLabelsManager.NO_LABEL)) { // If we don't allow reservation continuous looking, OR we're looking at // non-default node partition, we won't allow to unreserve before // allocation. resourceCouldBeUnReserved = Resources.none(); } - return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( - currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), + return Resources.greaterThanOrEqual(rc, clusterResource, Resources + .add(currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), required); } - private ContainerAllocation preCheckForNewContainer(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + String partition, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority) { - if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { - application.updateAppSkipNodeDiagnostics( - CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); - return ContainerAllocation.APP_SKIPPED; - } - - ResourceRequest anyRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); + ResourceRequest anyRequest = application.getResourceRequest(priority, + ResourceRequest.ANY); if (null == anyRequest) { return ContainerAllocation.PRIORITY_SKIPPED; } @@ -106,8 +108,8 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { if (application.isWaitingForAMContainer()) { if (LOG.isDebugEnabled()) { - LOG.debug("Skip allocating AM container to app_attempt=" - + application.getApplicationAttemptId() + LOG.debug("Skip allocating AM container to app_attempt=" + application + .getApplicationAttemptId() + ", don't allow to allocate AM container in non-exclusive mode"); } application.updateAppSkipNodeDiagnostics( @@ -120,8 +122,7 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, // matches the node's label? // If not match, jump to next priority. if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - anyRequest.getNodeLabelExpression(), node.getPartition(), - schedulingMode)) { + anyRequest.getNodeLabelExpression(), partition, schedulingMode)) { return ContainerAllocation.PRIORITY_SKIPPED; } @@ -134,7 +135,8 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, } } - if (!checkHeadroom(clusterResource, resourceLimits, required, node)) { + if (!checkHeadroomForPartition(clusterResource, resourceLimits, required, + partition)) { if (LOG.isDebugEnabled()) { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); @@ -149,11 +151,11 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, // This is to make sure non-partitioned-resource-request will prefer // to be allocated to non-partitioned nodes int missedNonPartitionedRequestSchedulingOpportunity = 0; - if (anyRequest.getNodeLabelExpression() - .equals(RMNodeLabelsManager.NO_LABEL)) { + if (anyRequest.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL)) { missedNonPartitionedRequestSchedulingOpportunity = - application - .addMissedNonPartitionedRequestSchedulingOpportunity(priority); + application.addMissedNonPartitionedRequestSchedulingOpportunity( + priority); } if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { @@ -174,7 +176,7 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, return ContainerAllocation.APP_SKIPPED; } } - + return null; } @@ -182,16 +184,7 @@ ContainerAllocation preAllocation(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority, RMContainer reservedContainer) { - ContainerAllocation result; - if (null == reservedContainer) { - // pre-check when allocating new container - result = - preCheckForNewContainer(clusterResource, node, schedulingMode, - resourceLimits, priority); - if (null != result) { - return result; - } - } else { + if (null != reservedContainer) { // pre-check when allocating reserved container if (application.getTotalRequiredResources(priority) == 0) { // Release @@ -200,35 +193,36 @@ ContainerAllocation preAllocation(Resource clusterResource, } } + ContainerAllocation result; // Try to allocate containers on node - result = - assignContainersOnNode(clusterResource, node, priority, - reservedContainer, schedulingMode, resourceLimits); - + result = assignContainersOnNode(clusterResource, node, priority, + reservedContainer, schedulingMode, resourceLimits); + if (null == reservedContainer) { if (result.state == AllocationState.PRIORITY_SKIPPED) { // Don't count 'skipped nodes' as a scheduling opportunity! application.subtractSchedulingOpportunity(priority); } } - + result.nodeToAllocate = node; + return result; } - - public synchronized float getLocalityWaitFactor( - Priority priority, int clusterNodes) { + + public synchronized float getLocalityWaitFactor(Priority priority, + int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = - Math.max(application.getResourceRequests(priority).size() - 1, 0); - + int requiredResources = Math.max( + application.getResourceRequests(priority).size() - 1, 0); + // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities - return Math.min(((float)requiredResources / clusterNodes), 1.0f); + return Math.min(((float) requiredResources / clusterNodes), 1.0f); } - + private int getActualNodeLocalityDelay() { - return Math.min(rmContext.getScheduler().getNumClusterNodes(), application - .getCSLeafQueue().getNodeLocalityDelay()); + return Math.min(rmContext.getScheduler().getNumClusterNodes(), + application.getCSLeafQueue().getNodeLocalityDelay()); } private boolean canAssign(Priority priority, FiCaSchedulerNode node, @@ -241,14 +235,14 @@ private boolean canAssign(Priority priority, FiCaSchedulerNode node, } // 'Delay' off-switch - ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - long missedOpportunities = application.getSchedulingOpportunities(priority); + ResourceRequest offSwitchRequest = application.getResourceRequest( + priority, ResourceRequest.ANY); + long missedOpportunities = application.getSchedulingOpportunities( + priority); long requiredContainers = offSwitchRequest.getNumContainers(); - float localityWaitFactor = - getLocalityWaitFactor(priority, rmContext.getScheduler() - .getNumClusterNodes()); + float localityWaitFactor = getLocalityWaitFactor(priority, + rmContext.getScheduler().getNumClusterNodes()); // Cap the delay by the number of nodes in the cluster. Under most conditions // this means we will consider each node in the cluster before // accepting an off-switch assignment. @@ -257,8 +251,8 @@ private boolean canAssign(Priority priority, FiCaSchedulerNode node, } // Check if we need containers on this rack - ResourceRequest rackLocalRequest = - application.getResourceRequest(priority, node.getRackName()); + ResourceRequest rackLocalRequest = application.getResourceRequest(priority, + node.getRackName()); if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { return false; } @@ -266,15 +260,16 @@ private boolean canAssign(Priority priority, FiCaSchedulerNode node, // If we are here, we do need containers on this rack for RACK_LOCAL req if (type == NodeType.RACK_LOCAL) { // 'Delay' rack-local just a little bit... - long missedOpportunities = application.getSchedulingOpportunities(priority); + long missedOpportunities = application.getSchedulingOpportunities( + priority); return getActualNodeLocalityDelay() < missedOpportunities; } // Check if we need containers on this host if (type == NodeType.NODE_LOCAL) { // Now check if we need containers on this host... - ResourceRequest nodeLocalRequest = - application.getResourceRequest(priority, node.getNodeName()); + ResourceRequest nodeLocalRequest = application.getResourceRequest( + priority, node.getNodeName()); if (nodeLocalRequest != null) { return nodeLocalRequest.getNumContainers() > 0; } @@ -329,19 +324,23 @@ private ContainerAllocation assignOffSwitchContainers( private ContainerAllocation assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { + application.updateAppSkipNodeDiagnostics( + CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); + return ContainerAllocation.APP_SKIPPED; + } ContainerAllocation allocation; NodeType requestType = null; // Data-local - ResourceRequest nodeLocalResourceRequest = - application.getResourceRequest(priority, node.getNodeName()); + ResourceRequest nodeLocalResourceRequest = application.getResourceRequest( + priority, node.getNodeName()); if (nodeLocalResourceRequest != null) { requestType = NodeType.NODE_LOCAL; - allocation = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, priority, reservedContainer, schedulingMode, - currentResoureLimits); + allocation = assignNodeLocalContainers(clusterResource, + nodeLocalResourceRequest, node, priority, reservedContainer, + schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { allocation.requestNodeType = requestType; @@ -350,8 +349,8 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, } // Rack-local - ResourceRequest rackLocalResourceRequest = - application.getResourceRequest(priority, node.getRackName()); + ResourceRequest rackLocalResourceRequest = application.getResourceRequest( + priority, node.getRackName()); if (rackLocalResourceRequest != null) { if (!rackLocalResourceRequest.getRelaxLocality()) { return ContainerAllocation.PRIORITY_SKIPPED; @@ -361,10 +360,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, requestType = NodeType.RACK_LOCAL; } - allocation = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, priority, reservedContainer, schedulingMode, - currentResoureLimits); + allocation = assignRackLocalContainers(clusterResource, + rackLocalResourceRequest, node, priority, reservedContainer, + schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { allocation.requestNodeType = requestType; @@ -373,8 +371,8 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, } // Off-switch - ResourceRequest offSwitchResourceRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); + ResourceRequest offSwitchResourceRequest = application.getResourceRequest( + priority, ResourceRequest.ANY); if (offSwitchResourceRequest != null) { if (!offSwitchResourceRequest.getRelaxLocality()) { return ContainerAllocation.PRIORITY_SKIPPED; @@ -384,12 +382,11 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, requestType = NodeType.OFF_SWITCH; } - allocation = - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, priority, reservedContainer, schedulingMode, - currentResoureLimits); + allocation = assignOffSwitchContainers(clusterResource, + offSwitchResourceRequest, node, priority, reservedContainer, + schedulingMode, currentResoureLimits); allocation.requestNodeType = requestType; - + // When a returned allocation is LOCALITY_SKIPPED, since we're in // off-switch request now, we will skip this app w.r.t priorities if (allocation.state == AllocationState.LOCALITY_SKIPPED) { @@ -407,17 +404,17 @@ private ContainerAllocation assignContainer(Resource clusterResource, NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { lastResourceRequest = request; - + if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " application=" + application.getApplicationId() - + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); + LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + + application.getApplicationId() + " priority=" + priority + .getPriority() + " request=" + request + " type=" + type); } // check if the resource request can access the label if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - request.getNodeLabelExpression(), node.getPartition(), schedulingMode)) { + request.getNodeLabelExpression(), node.getPartition(), + schedulingMode)) { // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. @@ -429,8 +426,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, Resource available = node.getUnallocatedResource(); Resource totalResource = node.getTotalResource(); - if (!Resources.lessThanOrEqual(rc, clusterResource, - capability, totalResource)) { + if (!Resources.lessThanOrEqual(rc, clusterResource, capability, + totalResource)) { LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); @@ -447,14 +444,12 @@ private ContainerAllocation assignContainer(Resource clusterResource, // How much need to unreserve equals to: // max(required - headroom, amountNeedUnreserve) - Resource resourceNeedToUnReserve = - Resources.max(rc, clusterResource, - Resources.subtract(capability, currentResoureLimits.getHeadroom()), - currentResoureLimits.getAmountNeededUnreserve()); + Resource resourceNeedToUnReserve = Resources.max(rc, clusterResource, + Resources.subtract(capability, currentResoureLimits.getHeadroom()), + currentResoureLimits.getAmountNeededUnreserve()); - boolean needToUnreserve = - Resources.greaterThan(rc, clusterResource, - resourceNeedToUnReserve, Resources.none()); + boolean needToUnreserve = Resources.greaterThan(rc, clusterResource, + resourceNeedToUnReserve, Resources.none()); RMContainer unreservedContainer = null; boolean reservationsContinueLooking = @@ -464,18 +459,16 @@ private ContainerAllocation assignContainer(Resource clusterResource, List toKillContainers = null; if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) { Resource availableAndKillable = Resources.clone(available); - for (RMContainer killableContainer : node - .getKillableContainers().values()) { + for (RMContainer killableContainer : node.getKillableContainers() + .values()) { if (null == toKillContainers) { toKillContainers = new ArrayList<>(); } toKillContainers.add(killableContainer); Resources.addTo(availableAndKillable, - killableContainer.getAllocatedResource()); - if (Resources.fitsIn(rc, - clusterResource, - capability, - availableAndKillable)) { + killableContainer.getAllocatedResource()); + if (Resources.fitsIn(rc, clusterResource, capability, + availableAndKillable)) { // Stop if we find enough spaces availableContainers = 1; break; @@ -487,8 +480,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, // Allocate... // We will only do continuous reservation when this is not allocated from // reserved container - if (rmContainer == null && reservationsContinueLooking - && node.getLabels().isEmpty()) { + if (rmContainer == null && reservationsContinueLooking && node.getLabels() + .isEmpty()) { // when reservationsContinueLooking is set, we may need to unreserve // some containers to meet this queue, its parents', or the users' // resource limits. @@ -503,9 +496,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, // under the limit. resourceNeedToUnReserve = capability; } - unreservedContainer = - application.findNodeToUnreserve(clusterResource, node, priority, - resourceNeedToUnReserve); + unreservedContainer = application.findNodeToUnreserve(clusterResource, + node, priority, resourceNeedToUnReserve); // When (minimum-unreserved-resource > 0 OR we cannot allocate // new/reserved // container (That means we *have to* unreserve some resource to @@ -518,9 +510,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, } } - ContainerAllocation result = - new ContainerAllocation(unreservedContainer, request.getCapability(), - AllocationState.ALLOCATED); + ContainerAllocation result = new ContainerAllocation(unreservedContainer, + request.getCapability(), AllocationState.ALLOCATED); result.containerNodeType = type; result.setToKillContainers(toKillContainers); return result; @@ -537,72 +528,66 @@ private ContainerAllocation assignContainer(Resource clusterResource, LOG.debug("we needed to unreserve to be able to allocate"); } // Skip the locality request - return ContainerAllocation.LOCALITY_SKIPPED; + return ContainerAllocation.LOCALITY_SKIPPED; } } - ContainerAllocation result = - new ContainerAllocation(null, request.getCapability(), - AllocationState.RESERVED); + ContainerAllocation result = new ContainerAllocation(null, + request.getCapability(), AllocationState.RESERVED); result.containerNodeType = type; result.setToKillContainers(null); return result; } // Skip the locality request - return ContainerAllocation.LOCALITY_SKIPPED; + return ContainerAllocation.LOCALITY_SKIPPED; } } - boolean - shouldAllocOrReserveNewContainer(Priority priority, Resource required) { + boolean shouldAllocOrReserveNewContainer(Priority priority, + Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); int starvation = 0; if (reservedContainers > 0) { - float nodeFactor = - Resources - .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation()); + float nodeFactor = Resources.ratio(rc, required, + application.getCSLeafQueue().getMaximumAllocation()); // Use percentage of node required to bias against large containers... // Protect against corner case where you need the whole node with // Math.min(nodeFactor, minimumAllocationFactor) - starvation = - (int) ((application.getReReservations(priority) / - (float) reservedContainers) * (1.0f - (Math.min( - nodeFactor, application.getCSLeafQueue() - .getMinimumAllocationFactor())))); + starvation = (int) ((application.getReReservations(priority) + / (float) reservedContainers) * (1.0f - (Math.min(nodeFactor, + application.getCSLeafQueue().getMinimumAllocationFactor())))); if (LOG.isDebugEnabled()) { - LOG.debug("needsContainers:" + " app.#re-reserve=" - + application.getReReservations(priority) + " reserved=" - + reservedContainers + " nodeFactor=" + nodeFactor - + " minAllocFactor=" - + application.getCSLeafQueue().getMinimumAllocationFactor() - + " starvation=" + starvation); + LOG.debug("needsContainers:" + " app.#re-reserve=" + application + .getReReservations(priority) + " reserved=" + reservedContainers + + " nodeFactor=" + nodeFactor + " minAllocFactor=" + application + .getCSLeafQueue().getMinimumAllocationFactor() + " starvation=" + + starvation); } } return (((starvation + requiredContainers) - reservedContainers) > 0); } - + private Container getContainer(RMContainer rmContainer, FiCaSchedulerNode node, Resource capability, Priority priority) { - return (rmContainer != null) ? rmContainer.getContainer() - : createContainer(node, capability, priority); + return (rmContainer != null) ? rmContainer.getContainer() : createContainer( + node, capability, priority); } private Container createContainer(FiCaSchedulerNode node, Resource capability, Priority priority) { NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = - BuilderUtils.newContainerId(application.getApplicationAttemptId(), - application.getNewContainerId()); + ContainerId containerId = BuilderUtils.newContainerId( + application.getApplicationAttemptId(), application.getNewContainerId()); // Create the container - return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); + return BuilderUtils.newContainer(containerId, nodeId, + node.getRMNode().getHttpAddress(), capability, priority, null); } - + private ContainerAllocation handleNewContainerAllocation( ContainerAllocation allocationResult, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, Container container) { @@ -611,52 +596,51 @@ private ContainerAllocation handleNewContainerAllocation( if (reservedContainer != null) { application.unreserve(priority, node, reservedContainer); } - + // Inform the application - RMContainer allocatedContainer = - application.allocate(allocationResult.containerNodeType, node, - priority, lastResourceRequest, container); + RMContainer allocatedContainer = application.allocate( + allocationResult.containerNodeType, node, priority, lastResourceRequest, + container); // Does the application need this resource? if (allocatedContainer == null) { // Skip this app if we failed to allocate. - ContainerAllocation ret = - new ContainerAllocation(allocationResult.containerToBeUnreserved, - null, AllocationState.APP_SKIPPED); + ContainerAllocation ret = new ContainerAllocation( + allocationResult.containerToBeUnreserved, null, + AllocationState.APP_SKIPPED); return ret; } // Inform the node node.allocateContainer(allocatedContainer); - + // update locality statistics application.incNumAllocatedContainers(allocationResult.containerNodeType, allocationResult.requestNodeType); - - return allocationResult; + + return allocationResult; } ContainerAllocation doAllocation(ContainerAllocation allocationResult, - FiCaSchedulerNode node, Priority priority, - RMContainer reservedContainer) { + Priority priority, RMContainer reservedContainer) { + FiCaSchedulerNode node = allocationResult.nodeToAllocate; + // Create the container if necessary - Container container = - getContainer(reservedContainer, node, - allocationResult.getResourceToBeAllocated(), priority); + Container container = getContainer(reservedContainer, node, + allocationResult.getResourceToBeAllocated(), priority); // something went wrong getting/creating the container if (container == null) { - application - .updateAppSkipNodeDiagnostics("Scheduling of container failed. "); + application.updateAppSkipNodeDiagnostics( + "Scheduling of container failed. "); LOG.warn("Couldn't get container for allocation!"); return ContainerAllocation.APP_SKIPPED; } if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { // When allocating container - allocationResult = - handleNewContainerAllocation(allocationResult, node, priority, - reservedContainer, container); + allocationResult = handleNewContainerAllocation(allocationResult, node, + priority, reservedContainer, container); } else { // When reserving container application.reserve(priority, node, reservedContainer, container); @@ -685,66 +669,108 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, // Non-exclusive scheduling opportunity is different: we need reset // it every time to make sure non-labeled resource request will be // most likely allocated on non-labeled nodes first. - application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority); + application.resetMissedNonPartitionedRequestSchedulingOpportunity( + priority); } return allocationResult; } - + private ContainerAllocation allocate(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + NodeCandidates nodeCandidatesFilter, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority, RMContainer reservedContainer) { - ContainerAllocation result = - preAllocation(clusterResource, node, schedulingMode, resourceLimits, - priority, reservedContainer); + // Check partition resource + if (null == reservedContainer) { + // pre-check when allocating new container + ContainerAllocation result = preCheckForNewContainer(clusterResource, + nodeCandidatesFilter.getPartition(), schedulingMode, resourceLimits, + priority); + if (null != result) { + return result; + } + } + + // When trying to allocate reserved container, only look at reserved node, + // otherwise look at nodes ordered by scorer + Iterator iter; + if (null == reservedContainer && application.getCapacitySchedulerContext() + .getConfiguration().getBoolean( + CapacitySchedulerConfiguration.ENABLE_GLOBAL_SCHEDULING, + CapacitySchedulerConfiguration.DEFAULT_GLOBAL_SCHEDULING_ENABLED)) { + SchedulerNodesScorer scorer = + SchedulerNodesScorerCache.getOrCreateScorer(application, priority); + iter = scorer.scoreNodeCandidates( + nodeCandidatesFilter); + } else { + iter = IteratorUtils.singletonIterator( + nodeCandidatesFilter.getNextAvailable()); + } + + ContainerAllocation result = ContainerAllocation.PRIORITY_SKIPPED; + while (iter.hasNext()) { + FiCaSchedulerNode node = iter.next(); - if (AllocationState.ALLOCATED == result.state - || AllocationState.RESERVED == result.state) { - result = doAllocation(result, node, priority, reservedContainer); + // Avoid node with <= 0 resources OR has reserved containers when not + // allocate from reserved container + if ((node.getReservedContainer() != null && reservedContainer == null) + || Resources.lessThanOrEqual(rc, clusterResource, + node.getUnallocatedResource(), Resources.none())) { + continue; + } + + // FIXME: part of the preAllocation can be extracted to avoid duplicated + // check for resource-requests across nodes. + result = preAllocation(clusterResource, node, schedulingMode, + resourceLimits, priority, reservedContainer); + + if (AllocationState.ALLOCATED == result.state + || AllocationState.RESERVED == result.state) { + result = doAllocation(result, priority, reservedContainer); + break; + } } return result; } - + @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, - RMContainer reservedContainer) { + NodeCandidates nodeCandidatesFilter, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, RMContainer reservedContainer) { if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. - if (!application.hasPendingResourceRequest(rc, - node.getPartition(), clusterResource, schedulingMode)) { + if (!application.hasPendingResourceRequest(rc, nodeCandidatesFilter.getPartition(), + clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + node.getPartition()); + + schedulingMode.name() + " node-label=" + nodeCandidatesFilter.getPartition()); } return CSAssignment.SKIP_ASSIGNMENT; } - + // Schedule in priority order for (Priority priority : application.getPriorities()) { - ContainerAllocation result = - allocate(clusterResource, node, schedulingMode, resourceLimits, - priority, null); + ContainerAllocation result = allocate(clusterResource, + nodeCandidatesFilter, + schedulingMode, resourceLimits, priority, null); AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { continue; } - return getCSAssignmentFromAllocateResult(clusterResource, result, - null); + return getCSAssignmentFromAllocateResult(clusterResource, result, null); } // We will reach here if we skipped all priorities of the app, so we will // skip the app. return CSAssignment.SKIP_ASSIGNMENT; } else { - ContainerAllocation result = - allocate(clusterResource, node, schedulingMode, resourceLimits, - reservedContainer.getReservedPriority(), reservedContainer); + ContainerAllocation result = allocate(clusterResource, + nodeCandidatesFilter, + schedulingMode, resourceLimits, + reservedContainer.getReservedPriority(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NodeCandidates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NodeCandidates.java new file mode 100644 index 0000000..876c5e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NodeCandidates.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class NodeCandidates { + private N nextAvailable; + private Map allSchedulableNodes; + private String partition; + + public NodeCandidates(N nextAvailable, Collection allSchedulable, + String partition) { + this.nextAvailable = nextAvailable; + this.allSchedulableNodes = new HashMap<>(); + if (null != allSchedulable) { + for (N n : allSchedulable) { + allSchedulableNodes.put(n.getNodeID(), n); + } + } + this.partition = partition; + } + + /* + * "I don't care, just give me next node to allocate" + */ + public N getNextAvailable() { + return nextAvailable; + } + + /* + * "I'm picky, give me all you have and I will decide" + */ + public Map getAllSchedulableNodes() { + return allSchedulableNodes; + } + + public String getPartition() { + return partition; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 8009580..625cf60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -491,7 +492,7 @@ public LeafQueue getCSLeafQueue() { } public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + NodeCandidates nodeCandidatesFilter, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " @@ -500,7 +501,8 @@ public CSAssignment assignContainers(Resource clusterResource, } synchronized (this) { - return containerAllocator.assignContainers(clusterResource, node, + return containerAllocator.assignContainers(clusterResource, + nodeCandidatesFilter, schedulingMode, currentResourceLimits, reservedContainer); } } @@ -603,4 +605,8 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString()); } } + + public CapacitySchedulerContext getCapacitySchedulerContext() { + return capacitySchedulerContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/DoNotCareNodesScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/DoNotCareNodesScorer.java new file mode 100644 index 0000000..fc2f8ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/DoNotCareNodesScorer.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.commons.collections.iterators.SingletonIterator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; + +import java.util.Iterator; + +public class DoNotCareNodesScorer + implements SchedulerNodesScorer { + @Override + public Iterator scoreNodeCandidates( + NodeCandidates candidates) { + return new SingletonIterator(candidates.getNextAvailable()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/LocalityNodesScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/LocalityNodesScorer.java new file mode 100644 index 0000000..26a5491 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/LocalityNodesScorer.java @@ -0,0 +1,133 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class LocalityNodesScorer + implements SchedulerNodesScorer { + private SchedulerApplicationAttempt attempt; + private Priority priority; + private long lastInitializedTime = 0; + + private ConcurrentLinkedQueue nodeLocalHosts; + private ConcurrentLinkedQueue rackLocalHosts; + private ConcurrentLinkedQueue offswitchHosts; + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + public LocalityNodesScorer(SchedulerApplicationAttempt attempt, + Priority priority) { + this.attempt = attempt; + this.priority = priority; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + private void reinitializeIfNeeded(NodeCandidates candidates) { + // Do not reinitialize in 5000 ms. + // FIXME: this should be configurable and will be forced to update when + // Requirement changes, etc. + if (System.currentTimeMillis() - 5000L < lastInitializedTime) { + return; + } + + lastInitializedTime = System.currentTimeMillis(); + + try { + writeLock.lock(); + if (null == nodeLocalHosts) { + nodeLocalHosts = new ConcurrentLinkedQueue<>(); + rackLocalHosts = new ConcurrentLinkedQueue<>(); + offswitchHosts = new ConcurrentLinkedQueue<>(); + } else { + nodeLocalHosts.clear(); + rackLocalHosts.clear(); + offswitchHosts.clear(); + } + + // We don't need any resource + boolean needResource = attempt.getResourceRequest(priority, + ResourceRequest.ANY).getNumContainers() > 0; + if (!needResource) { + return; + } + + for (Map.Entry entry : candidates.getAllSchedulableNodes().entrySet()) { + NodeId nodeId = entry.getKey(); + N node = entry.getValue(); + String rack = node.getRackName(); + + ResourceRequest rr = attempt.getAppSchedulingInfo().getResourceRequest( + priority, nodeId.getHost()); + if (rr != null && rr.getNumContainers() > 0) { + nodeLocalHosts.add(node); + } else { + rr = attempt.getAppSchedulingInfo().getResourceRequest(priority, rack); + boolean hasRackLocalRequest = rr != null && rr.getNumContainers() > 0; + if (hasRackLocalRequest) { + rackLocalHosts.add(node); + } else { + offswitchHosts.add(node); + } + } + } + } finally { + writeLock.unlock(); + } + } + + private void moveFirstToLast(ConcurrentLinkedQueue queue) { + N n = null; + try { + n = queue.poll(); + } catch (NoSuchElementException e) { + // do nothing; + } + + if (n != null) { + queue.add(n); + } + } + + @Override + public Iterator scoreNodeCandidates( + NodeCandidates candidates) { + reinitializeIfNeeded(candidates); + + try { + writeLock.lock(); + moveFirstToLast(nodeLocalHosts); + moveFirstToLast(rackLocalHosts); + moveFirstToLast(offswitchHosts); + } finally { + writeLock.unlock(); + } + + try { + readLock.lock(); + return IteratorUtils.chainedIterator( + new Iterator[] { nodeLocalHosts.iterator(), rackLocalHosts.iterator(), + offswitchHosts.iterator() }); + } finally { + readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorer.java new file mode 100644 index 0000000..3f976ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorer.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeCandidates; + +import java.util.Iterator; + +public interface SchedulerNodesScorer { + Iterator scoreNodeCandidates(NodeCandidates candidates); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java new file mode 100644 index 0000000..c6d815d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Map; + +/** + * Do necessary caching for scorer according to type and applications + */ +public class SchedulerNodesScorerCache { + // At most store 10K objects + private static LRUMap lruCache = new LRUMap(1024 * 10); + + private static SchedulerNodesScorerType getSchedulerNodesScorerType( + SchedulerApplicationAttempt attempt, Priority priority) { + Map requests = attempt.getResourceRequests( + priority); + + // Simplest rule to determine with nodes scorer will be used: + // When requested #resourceName > 0, use locality, otherwise use DO_NOT_CARE + if (requests != null && requests.size() > 1) { + return SchedulerNodesScorerType.LOCALITY; + } + + return SchedulerNodesScorerType.DO_NOT_CARE; + } + + public static SchedulerNodesScorer getOrCreateScorer( + SchedulerApplicationAttempt attempt, Priority priority) { + SchedulerNodesScorerType type = getSchedulerNodesScorerType(attempt, + priority); + + return getOrCreateScorer(attempt, priority, type); + } + + public static SchedulerNodesScorer getOrCreateScorer( + SchedulerApplicationAttempt attempt, Priority priority, + SchedulerNodesScorerType type) { + String key = + attempt.getApplicationAttemptId().toString() + priority.toString(); + SchedulerNodesScorer scorer; + scorer = (SchedulerNodesScorer) lruCache.get(key); + + if (null == scorer) { + // FIXME, for simple, create scorer every time. We can cache scorer + // without any issue + switch (type) { + case LOCALITY: + scorer = new LocalityNodesScorer<>(attempt, priority); + break; + case DO_NOT_CARE: + scorer = new DoNotCareNodesScorer<>(); + break; + default: + return null; + } + + lruCache.put(key, scorer); + } + + return scorer; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java new file mode 100644 index 0000000..5d5b817 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +public enum SchedulerNodesScorerType { + DO_NOT_CARE, // Any node is fine + LOCALITY, // Locality-based +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 72d2f85..676ff18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1242,7 +1242,7 @@ public void testPreemptionInfo() throws Exception { rm1.stop(); } - @Test(timeout = 30000) + @Test(timeout = 300000) public void testRecoverRequestAfterPreemption() throws Exception { Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,