diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 082ec14d433..9f49880201f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -46,9 +46,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -82,8 +82,8 @@ private final ConcurrentSkipListSet schedulerKeys = new ConcurrentSkipListSet<>(); - final Map> - schedulerKeyToPlacementSets = new ConcurrentHashMap<>(); + private final Map> + schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>(); private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -146,7 +146,7 @@ public boolean isPending() { */ private void clearRequests() { schedulerKeys.clear(); - schedulerKeyToPlacementSets.clear(); + schedulerKeyToAppPlacementAllocator.clear(); LOG.info("Application " + applicationId + " requests cleared"); } @@ -190,9 +190,9 @@ public boolean updateResourceRequests(List requests, dedupRequests.get(schedulerKey).put(request.getResourceName(), request); } - // Update scheduling placement set + // Update AppPlacementAllocator by dedup requests. offswitchResourcesUpdated = - addToPlacementSets( + addRequestToAppPlacement( recoverPreemptedRequestForAContainer, dedupRequests); return offswitchResourcesUpdated; @@ -201,11 +201,11 @@ public boolean updateResourceRequests(List requests, } } - public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) { - schedulerKeyToPlacementSets.remove(schedulerRequestKey); + public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) { + schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey); } - boolean addToPlacementSets( + boolean addRequestToAppPlacement( boolean recoverPreemptedRequestForAContainer, Map> dedupRequests) { boolean offswitchResourcesUpdated = false; @@ -213,14 +213,15 @@ boolean addToPlacementSets( dedupRequests.entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); - if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) { - schedulerKeyToPlacementSets.put(schedulerRequestKey, - new LocalitySchedulingPlacementSet<>(this)); + if (!schedulerKeyToAppPlacementAllocator.containsKey( + schedulerRequestKey)) { + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, + new LocalityAppPlacementAllocator<>(this)); } - // Update placement set + // Update AppPlacementAllocator ResourceRequestUpdateResult pendingAmountChanges = - schedulerKeyToPlacementSets.get(schedulerRequestKey) + schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) .updateResourceRequests( entry.getValue().values(), recoverPreemptedRequestForAContainer); @@ -244,7 +245,7 @@ private void updatePendingResources(ResourceRequest lastRequest, if (request.getNumContainers() <= 0) { if (lastRequestContainers >= 0) { schedulerKeys.remove(schedulerKey); - schedulerKeyToPlacementSets.remove(schedulerKey); + schedulerKeyToAppPlacementAllocator.remove(schedulerKey); } LOG.info("checking for deactivate of application :" + this.applicationId); @@ -356,8 +357,9 @@ public boolean getAndResetBlacklistChanged() { List ret = new ArrayList<>(); try { this.readLock.lock(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - ret.addAll(ps.getResourceRequests().values()); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + ret.addAll(ap.getResourceRequests().values()); } } finally { this.readLock.unlock(); @@ -384,8 +386,9 @@ public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey, String resourceName) { try { this.readLock.lock(); - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); - return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName); + AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( + schedulerKey); + return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName); } finally { this.readLock.unlock(); } @@ -424,7 +427,7 @@ public boolean isPlaceBlacklisted(String resourceName, updateMetricsForAllocatedContainer(type, node, containerAllocated); } - return schedulerKeyToPlacementSets.get(schedulerKey).allocate( + return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate( schedulerKey, type, node); } finally { writeLock.unlock(); @@ -442,23 +445,24 @@ public void move(Queue newQueue) { this.writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { oldMetrics.decrPendingResources( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); newMetrics.incrPendingResources( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); Resource delta = Resources.multiply(ask.getPerAllocationResource(), ask.getCount()); // Update Queue queue.decPendingResource( - ps.getPrimaryRequestedNodePartition(), delta); + ap.getPrimaryRequestedNodePartition(), delta); newQueue.incPendingResource( - ps.getPrimaryRequestedNodePartition(), delta); + ap.getPrimaryRequestedNodePartition(), delta); } } oldMetrics.moveAppFrom(this); @@ -477,15 +481,16 @@ public void stop() { try { this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { - metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(), + metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); // Update Queue queue.decPendingResource( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), Resources.multiply(ask.getPerAllocationResource(), ask.getCount())); } @@ -559,11 +564,12 @@ public boolean checkAllocation(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey) { try { readLock.lock(); - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); - if (null == ps) { + AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( + schedulerKey); + if (null == ap) { return false; } - return ps.canAllocate(type, node); + return ap.canAllocate(type, node); } finally { readLock.unlock(); } @@ -593,11 +599,10 @@ private void updateMetricsForAllocatedContainer(NodeType type, metrics.incrNodeTypeAggregations(user, type); } - // Get placement-set by specified schedulerKey - // Now simply return all node of the input clusterPlacementSet - public SchedulingPlacementSet getSchedulingPlacementSet( + // Get AppPlacementAllocator by specified schedulerKey + public AppPlacementAllocator getAppPlacementAllocator( SchedulerRequestKey schedulerkey) { - return (SchedulingPlacementSet) schedulerKeyToPlacementSets.get( + return (AppPlacementAllocator) schedulerKeyToAppPlacementAllocator.get( schedulerkey); } @@ -614,9 +619,9 @@ public boolean canDelayTo( SchedulerRequestKey schedulerKey, String resourceName) { try { this.readLock.lock(); - SchedulingPlacementSet ps = - schedulerKeyToPlacementSets.get(schedulerKey); - return (ps == null) || ps.canDelayTo(resourceName); + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap == null) || ap.canDelayTo(resourceName); } finally { this.readLock.unlock(); } @@ -626,9 +631,9 @@ public boolean acceptNodePartition(SchedulerRequestKey schedulerKey, String nodePartition, SchedulingMode schedulingMode) { try { this.readLock.lock(); - SchedulingPlacementSet ps = - schedulerKeyToPlacementSets.get(schedulerKey); - return (ps != null) && ps.acceptNodePartition(nodePartition, + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap != null) && ap.acceptNodePartition(nodePartition, schedulingMode); } finally { this.readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 5ac2ac5918e..93995a1dcfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -34,8 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement - .SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; @@ -146,17 +145,17 @@ public synchronized boolean checkAndAddToOutstandingIncreases( createResourceRequests(rmContainer, schedulerNode, schedulerKey, resToIncrease); updateResReqs.put(schedulerKey, resMap); - appSchedulingInfo.addToPlacementSets(false, updateResReqs); + appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs); } return true; } private void cancelPreviousRequest(SchedulerNode schedulerNode, SchedulerRequestKey schedulerKey) { - SchedulingPlacementSet schedulingPlacementSet = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); - if (schedulingPlacementSet != null) { - Map resourceRequests = schedulingPlacementSet + AppPlacementAllocator appPlacementAllocator = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); + if (appPlacementAllocator != null) { + Map resourceRequests = appPlacementAllocator .getResourceRequests(); ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY); // Decrement the pending using a dummy RR with @@ -290,7 +289,7 @@ public ContainerId matchContainerToOutstandingIncreaseReq( (rmContainer, node, schedulerKey, rmContainer.getContainer().getResource()); reqsToUpdate.put(schedulerKey, resMap); - appSchedulingInfo.addToPlacementSets(true, reqsToUpdate); + appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate); return UNDEFINED; } return retVal; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ce71afa7ef7..3f7497356c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -79,7 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; @@ -316,9 +316,9 @@ public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey, String resourceName) { try { readLock.lock(); - SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet( + AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator( schedulerKey); - return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName); + return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName); } finally { readLock.unlock(); } @@ -617,13 +617,13 @@ public void showRequests() { try { readLock.lock(); for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { - SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey); - if (ps != null && - ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) { + AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey); + if (ap != null && + ap.getOutstandingAsksCount(ResourceRequest.ANY) > 0) { LOG.debug("showRequests:" + " application=" + getApplicationId() + " headRoom=" + getHeadroom() + " currentConsumption=" + attemptResourceUsage.getUsed().getMemorySize()); - ps.showRequests(); + ap.showRequests(); } } } finally { @@ -1334,14 +1334,14 @@ protected void setAttemptRecovering(boolean isRecovering) { this.isAttemptRecovering = isRecovering; } - public SchedulingPlacementSet getSchedulingPlacementSet( + public AppPlacementAllocator getAppPlacementAllocator( SchedulerRequestKey schedulerRequestKey) { - return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey); + return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey); } public Map getResourceRequests( SchedulerRequestKey schedulerRequestKey) { - return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey) + return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey) .getResourceRequests(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 12aff02ca10..0c351b671a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -32,7 +32,7 @@ /** * Utility for logging scheduler activities */ -// FIXME: make sure PlacementSet works with this class +// FIXME: make sure CandidateNodeSet works with this class public class ActivitiesLogger { private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 250f4e6b9a7..183cb362781 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -876,7 +876,7 @@ public Resource getTotalKillableResource(String partition) { public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { - return assignContainers(clusterResource, new SimplePlacementSet<>(node), + return assignContainers(clusterResource, new SimpleCandidateNodeSet<>(node), resourceLimits, schedulingMode); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 3a17d1b057d..43e7f532872 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -46,12 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; /** * CSQueue represents a node in the tree of @@ -188,15 +185,16 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, /** * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. - * @param ps {@link PlacementSet} of nodes which resources are available + * @param candidates {@link CandidateNodeSet} the nodes that are considered + * for the current placement. * @param resourceLimits how much overall resource of this queue can use. * @param schedulingMode Type of exclusive check when assign container on a * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits resourceLimits, - SchedulingMode schedulingMode); + CandidateNodeSet candidates, + ResourceLimits resourceLimits, SchedulingMode schedulingMode); /** * A container assigned to the queue has completed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ca289b1d2fd..dd97ca8ac57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -132,9 +132,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; @@ -1183,7 +1183,7 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, /** * We need to make sure when doing allocation, Node should be existed - * And we will construct a {@link PlacementSet} before proceeding + * And we will construct a {@link CandidateNodeSet} before proceeding */ private void allocateContainersToNode(NodeId nodeId, boolean withNodeHeartbeat) { @@ -1192,8 +1192,10 @@ private void allocateContainersToNode(NodeId nodeId, int offswitchCount = 0; int assignedContainers = 0; - PlacementSet ps = new SimplePlacementSet<>(node); - CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat); + CandidateNodeSet candidates = + new SimpleCandidateNodeSet<>(node); + CSAssignment assignment = allocateContainersToNode(candidates, + withNodeHeartbeat); // Only check if we can allocate more container on the same node when // scheduling is triggered by node heartbeat if (null != assignment && withNodeHeartbeat) { @@ -1210,7 +1212,7 @@ private void allocateContainersToNode(NodeId nodeId, assignedContainers)) { // Try to see if it is possible to allocate multiple container for // the same node heartbeat - assignment = allocateContainersToNode(ps, true); + assignment = allocateContainersToNode(candidates, true); if (null != assignment && assignment.getType() == NodeType.OFF_SWITCH) { @@ -1237,8 +1239,9 @@ private void allocateContainersToNode(NodeId nodeId, /* * Logics of allocate container on a single node (Old behavior) */ - private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, - FiCaSchedulerNode node, boolean withNodeHeartbeat) { + private CSAssignment allocateContainerOnSingleNode( + CandidateNodeSet candidates, FiCaSchedulerNode node, + boolean withNodeHeartbeat) { // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. if (getNode(node.getNodeID()) != node) { @@ -1262,7 +1265,7 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, boolean withNodeHeartbeat) { + CandidateNodeSet candidates, + boolean withNodeHeartbeat) { CSAssignment assignment = getRootQueue().assignContainers( - getClusterResource(), ps, new ResourceLimits(labelManager - .getResourceByLabel(ps.getPartition(), getClusterResource())), + getClusterResource(), candidates, new ResourceLimits(labelManager + .getResourceByLabel(candidates.getPartition(), + getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1346,30 +1351,34 @@ private CSAssignment allocateOrReserveNewContainers( assignment.getResource(), Resources.none())) { if (withNodeHeartbeat) { updateSchedulerHealth(lastNodeUpdateTime, - PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment); + CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(), + assignment); } return assignment; } // Only do non-exclusive allocation when node has node-labels. - if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) { + if (StringUtils.equals(candidates.getPartition(), + RMNodeLabelsManager.NO_LABEL)) { return null; } // Only do non-exclusive allocation when the node-label supports that try { if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( - ps.getPartition())) { + candidates.getPartition())) { return null; } } catch (IOException e) { - LOG.warn("Exception when trying to get exclusivity of node label=" + ps + LOG.warn( + "Exception when trying to get exclusivity of node label=" + candidates .getPartition(), e); return null; } // Try to use NON_EXCLUSIVE - assignment = getRootQueue().assignContainers(getClusterResource(), ps, + assignment = getRootQueue().assignContainers(getClusterResource(), + candidates, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager @@ -1386,13 +1395,14 @@ private CSAssignment allocateOrReserveNewContainers( * New behavior, allocate containers considering multiple nodes */ private CSAssignment allocateContainersOnMultiNodes( - PlacementSet ps) { + CandidateNodeSet candidates) { // When this time look at multiple nodes, try schedule if the // partition has any available resource or killable resource if (getRootQueue().getQueueCapacities().getUsedCapacity( - ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource( - CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources - .none()) { + candidates.getPartition()) >= 1.0f + && preemptionManager.getKillableResource( + CapacitySchedulerConfiguration.ROOT, candidates.getPartition()) + == Resources.none()) { if (LOG.isDebugEnabled()) { LOG.debug("This node or this node partition doesn't have available or" + "killable resource"); @@ -1400,11 +1410,12 @@ private CSAssignment allocateContainersOnMultiNodes( return null; } - return allocateOrReserveNewContainers(ps, false); + return allocateOrReserveNewContainers(candidates, false); } @VisibleForTesting - CSAssignment allocateContainersToNode(PlacementSet ps, + CSAssignment allocateContainersToNode( + CandidateNodeSet candidates, boolean withNodeHeartbeat) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext .isSchedulerReadyForAllocatingContainers()) { @@ -1413,14 +1424,14 @@ CSAssignment allocateContainersToNode(PlacementSet ps, // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // We have two different logics to handle allocation on single node / multi // nodes. if (null != node) { - return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat); - } else { - return allocateContainersOnMultiNodes(ps); + return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + } else{ + return allocateContainersOnMultiNodes(candidates); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f24e30aa1ee..096d087974e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -69,8 +69,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; @@ -970,10 +970,10 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } - private CSAssignment allocateFromReservedContainer( - Resource clusterResource, PlacementSet ps, + private CSAssignment allocateFromReservedContainer(Resource clusterResource, + CandidateNodeSet candidates, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (null == node) { return null; } @@ -987,7 +987,8 @@ private CSAssignment allocateFromReservedContainer( ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node.getNodeID(), SystemClock.getInstance().getTime(), application); CSAssignment assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, reservedContainer); + candidates, currentResourceLimits, schedulingMode, + reservedContainer); return assignment; } } @@ -997,43 +998,44 @@ private CSAssignment allocateFromReservedContainer( @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { + CandidateNodeSet candidates, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: partition=" + ps.getPartition() + LOG.debug("assignContainers: partition=" + candidates.getPartition() + " #applications=" + orderingPolicy.getNumSchedulableEntities()); } - setPreemptionAllowed(currentResourceLimits, ps.getPartition()); + setPreemptionAllowed(currentResourceLimits, candidates.getPartition()); // Check for reserved resources, try to allocate reserved container first. CSAssignment assignment = allocateFromReservedContainer(clusterResource, - ps, currentResourceLimits, schedulingMode); + candidates, currentResourceLimits, schedulingMode); if (null != assignment) { return assignment; } // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(ps.getPartition())) { + && !accessibleToPartition(candidates.getPartition())) { ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + candidates .getPartition()); return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!hasPendingResourceRequest(ps.getPartition(), clusterResource, + if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + ps.getPartition()); + + schedulingMode.name() + " node-partition=" + candidates + .getPartition()); } ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, @@ -1078,7 +1080,8 @@ public CSAssignment assignContainers(Resource clusterResource, cachedUserLimit = cul.userLimit; } Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit); + clusterResource, candidates.getPartition(), schedulingMode, + cachedUserLimit); if (cul == null) { cul = new CachedUserLimit(userLimit); userLimits.put(application.getUser(), cul); @@ -1106,7 +1109,7 @@ public CSAssignment assignContainers(Resource clusterResource, // Try to schedule assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, null); + candidates, currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " + application diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6800b74f8d4..959ca51eb2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.util.resource.Resources; @Private @@ -479,16 +479,16 @@ private String getParentName() { @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits resourceLimits, - SchedulingMode schedulingMode) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + CandidateNodeSet candidates, + ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(ps.getPartition())) { + && !accessibleToPartition(candidates.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + ps + + ", because it is not able to access partition=" + candidates .getPartition()); } @@ -506,12 +506,12 @@ public CSAssignment assignContainers(Resource clusterResource, // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource, - schedulingMode)) { + if (!super.hasPendingResourceRequest(candidates.getPartition(), + clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + ps + + schedulingMode.name() + " node-partition=" + candidates .getPartition()); } @@ -538,7 +538,8 @@ public CSAssignment assignContainers(Resource clusterResource, // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, + candidates.getPartition(), resourceLimits, Resources .createResource(getMetrics().getReservedMB(), getMetrics().getReservedVirtualCores()), schedulingMode)) { @@ -556,7 +557,7 @@ public CSAssignment assignContainers(Resource clusterResource, // Schedule CSAssignment assignedToChild = assignContainersToChildQueues( - clusterResource, ps, resourceLimits, schedulingMode); + clusterResource, candidates, resourceLimits, schedulingMode); assignment.setType(assignedToChild.getType()); assignment.setRequestLocalityType( assignedToChild.getRequestLocalityType()); @@ -710,7 +711,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, } private CSAssignment assignContainersToChildQueues(Resource cluster, - PlacementSet ps, ResourceLimits limits, + CandidateNodeSet candidates, ResourceLimits limits, SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; @@ -719,7 +720,7 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, // Try to assign to most 'under-served' sub-queue for (Iterator iter = sortAndGetChildrenAllocationIterator( - ps.getPartition()); iter.hasNext(); ) { + candidates.getPartition()); iter.hasNext(); ) { CSQueue childQueue = iter.next(); if(LOG.isDebugEnabled()) { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() @@ -729,10 +730,10 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, parentLimits, - ps.getPartition()); - - CSAssignment childAssignment = childQueue.assignContainers(cluster, ps, - childLimits, schedulingMode); + candidates.getPartition()); + + CSAssignment childAssignment = childQueue.assignContainers(cluster, + candidates, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index 5809d86e6d0..95e05337f83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -20,7 +20,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -34,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -176,13 +175,14 @@ protected CSAssignment getCSAssignmentFromAllocateResult( * * * @param clusterResource clusterResource - * @param ps PlacementSet + * @param candidates CandidateNodeSet * @param schedulingMode scheduling mode (exclusive or nonexclusive) * @param resourceLimits resourceLimits * @param reservedContainer reservedContainer * @return CSAssignemnt proposal */ public abstract CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer); + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + RMContainer reservedContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 4879fae0562..9df03b84a8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -21,16 +21,14 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerAllocator extends AbstractContainerAllocator { private AbstractContainerAllocator regularContainerAllocator; @@ -50,10 +48,11 @@ public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer) { + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + RMContainer reservedContainer) { return regularContainerAllocator.assignContainers(clusterResource, - ps, schedulingMode, resourceLimits, reservedContainer); + candidates, schedulingMode, resourceLimits, reservedContainer); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 72dfbdd6dfb..69e90c68a43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -50,9 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -91,15 +91,16 @@ private boolean checkHeadroom(Resource clusterResource, /* * Pre-check if we can allocate a pending resource request - * (given schedulerKey) to a given PlacementSet. + * (given schedulerKey) to a given CandidateNodeSet. * We will consider stuffs like exclusivity, pending resource, node partition, * headroom, etc. */ - private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { + private ContainerAllocation preCheckForNodeCandidateSet( + Resource clusterResource, CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + SchedulerRequestKey schedulerKey) { Priority priority = schedulerKey.getPriority(); - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey, ResourceRequest.ANY); @@ -164,7 +165,7 @@ private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, } if (!checkHeadroom(clusterResource, resourceLimits, required, - ps.getPartition())) { + candidates.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); @@ -182,7 +183,7 @@ private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, // Only do this when request associated with given scheduler key accepts // NO_LABEL under RESPECT_EXCLUSIVITY mode if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, - appInfo.getSchedulingPlacementSet(schedulerKey) + appInfo.getAppPlacementAllocator(schedulerKey) .getPrimaryRequestedNodePartition())) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( @@ -265,7 +266,7 @@ public float getLocalityWaitFactor( SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = Math.max( - application.getSchedulingPlacementSet(schedulerKey) + application.getAppPlacementAllocator(schedulerKey) .getUniqueLocationAsks() - 1, 0); // waitFactor can't be more than '1' @@ -780,15 +781,15 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, } private ContainerAllocation allocate(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, - RMContainer reservedContainer) { + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { // Do checks before determining which node to allocate // Directly return if this check fails. ContainerAllocation result; if (reservedContainer == null) { - result = preCheckForPlacementSet(clusterResource, ps, schedulingMode, - resourceLimits, schedulerKey); + result = preCheckForNodeCandidateSet(clusterResource, candidates, + schedulingMode, resourceLimits, schedulerKey); if (null != result) { return result; } @@ -801,14 +802,14 @@ private ContainerAllocation allocate(Resource clusterResource, } } - SchedulingPlacementSet schedulingPS = - application.getAppSchedulingInfo().getSchedulingPlacementSet( + AppPlacementAllocator schedulingPS = + application.getAppSchedulingInfo().getAppPlacementAllocator( schedulerKey); result = ContainerAllocation.PRIORITY_SKIPPED; Iterator iter = schedulingPS.getPreferredNodeIterator( - ps); + candidates); while (iter.hasNext()) { FiCaSchedulerNode node = iter.next(); @@ -827,19 +828,20 @@ private ContainerAllocation allocate(Resource clusterResource, @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. if (!application.hasPendingResourceRequest(rc, - ps.getPartition(), clusterResource, schedulingMode)) { + candidates.getPartition(), clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + ps.getPartition()); + + schedulingMode.name() + " node-label=" + candidates + .getPartition()); } ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, application.getPriority(), @@ -849,9 +851,8 @@ public CSAssignment assignContainers(Resource clusterResource, // Schedule in priority order for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { - ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, - schedulerKey, null); + ContainerAllocation result = allocate(clusterResource, candidates, + schedulingMode, resourceLimits, schedulerKey, null); AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { @@ -869,7 +870,7 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, + allocate(clusterResource, candidates, schedulingMode, resourceLimits, reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer, node); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a12c5ec7f68..40405fc11ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -76,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -224,10 +224,10 @@ public RMContainer allocate(FiCaSchedulerNode node, return null; } - SchedulingPlacementSet ps = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + AppPlacementAllocator ps = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); if (null == ps) { - LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName() + LOG.warn("Failed to get " + AppPlacementAllocator.class.getName() + " for application=" + getApplicationId() + " schedulerRequestKey=" + schedulerKey); return null; @@ -636,8 +636,8 @@ private boolean internalUnreserve(FiCaSchedulerNode node, Map ret = new HashMap<>(); for (SchedulerRequestKey schedulerKey : appSchedulingInfo .getSchedulerKeys()) { - SchedulingPlacementSet ps = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + AppPlacementAllocator ps = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); String nodePartition = ps.getPrimaryRequestedNodePartition(); Resource res = ret.get(nodePartition); @@ -844,8 +844,9 @@ public LeafQueue getCSLeafQueue() { } public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode, RMContainer reservedContainer) { + CandidateNodeSet ps, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, + RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + getApplicationId()); @@ -962,9 +963,9 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { @Override @SuppressWarnings("unchecked") - public SchedulingPlacementSet getSchedulingPlacementSet( + public AppPlacementAllocator getAppPlacementAllocator( SchedulerRequestKey schedulerRequestKey) { - return super.getSchedulingPlacementSet(schedulerRequestKey); + return super.getAppPlacementAllocator(schedulerRequestKey); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 157d2640530..bbd44188738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -1019,7 +1019,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { } if (offswitchAsk.getCount() > 0) { - if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks() + if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks() <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) { if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java similarity index 82% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index 3e0620e5de7..cc8f7f4d818 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -32,26 +32,29 @@ /** *

- * Comparing to {@link PlacementSet}, this also maintains - * pending ResourceRequests: - * - When new ResourceRequest(s) added to scheduler, or, - * - Or new container allocated, scheduler can notify corresponding - * PlacementSet. + * This class maintains + * + * 1) Keep track of pending resource requests when following events happen: + * - New ResourceRequest(s) added to scheduler. + * - New container allocated. + * + * 2) The order of preferred node to allocate by given {@link CandidateNodeSet} *

* *

- * Different set of resource requests (E.g., resource requests with the - * same schedulerKey) can have one instance of PlacementSet, each PlacementSet - * can have different ways to order nodes depends on requests. + * And different set of resource requests (E.g., resource requests with the + * same schedulerKey) can have one instance of AppPlacementAllocator, each + * AppPlacementAllocator can have different ways to order nodes depends on + * requests. *

*/ -public interface SchedulingPlacementSet { +public interface AppPlacementAllocator { /** * Get iterator of preferred node depends on requirement and/or availability - * @param clusterPlacementSet input cluster PlacementSet + * @param candidateNodeSet input CandidateNodeSet * @return iterator of preferred node */ - Iterator getPreferredNodeIterator(PlacementSet clusterPlacementSet); + Iterator getPreferredNodeIterator(CandidateNodeSet candidateNodeSet); /** * Replace existing ResourceRequest by the new requests @@ -115,8 +118,9 @@ ResourceRequestUpdateResult updateResourceRequests( /** * Can delay to give locality? - * TODO (wangda): This should be moved out of SchedulingPlacementSet + * TODO: This should be moved out of AppPlacementAllocator * and should belong to specific delay scheduling policy impl. + * See YARN-7457 for more details. * * @param resourceName resourceName * @return can/cannot @@ -124,7 +128,7 @@ ResourceRequestUpdateResult updateResourceRequests( boolean canDelayTo(String resourceName); /** - * Does this {@link SchedulingPlacementSet} accept resources on nodePartition? + * Does this {@link AppPlacementAllocator} accept resources on nodePartition? * * @param nodePartition nodePartition * @param schedulingMode schedulingMode @@ -146,8 +150,9 @@ boolean acceptNodePartition(String nodePartition, * @return number of unique location asks with #pending greater than 0, * (like /rack1, host1, etc.). * - * TODO (wangda): This should be moved out of SchedulingPlacementSet + * TODO: This should be moved out of AppPlacementAllocator * and should belong to specific delay scheduling policy impl. + * See YARN-7457 for more details. */ int getUniqueLocationAsks(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java similarity index 65% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java index 2e6c3cab334..6f127c9f0af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java @@ -23,42 +23,38 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import java.util.Iterator; import java.util.Map; /** - *

- * PlacementSet is the central place that decide the order of node to fit - * asks by application. - *

+ * A group of nodes which can be allocated by scheduler. * - *

- * Also, PlacementSet can cache results (for example, ordered list) for - * better performance. - *

+ * It will have following part: * - *

- * PlacementSet can depend on one or more other PlacementSets. - *

+ * 1) A map of nodes which can be schedule-able. + * 2) Version of the node set, version should be updated if any node added or + * removed from the node set. This will be used by + * {@link AppPlacementAllocator} or other class to check if it's required to + * invalidate local caches, etc. + * 3) Node partition of the candidate set. */ @InterfaceAudience.Private @InterfaceStability.Unstable -public interface PlacementSet { +public interface CandidateNodeSet { /** - * Get all nodes for this PlacementSet - * @return all nodes for this PlacementSet + * Get all nodes for this CandidateNodeSet + * @return all nodes for this CandidateNodeSet */ Map getAllNodes(); /** - * Version of the PlacementSet, can help other PlacementSet with dependencies - * deciding if update is required + * Version of the CandidateNodeSet, can help {@link AppPlacementAllocator} to + * decide if update is required * @return version */ long getVersion(); /** - * Partition of the PlacementSet. + * Node partition of the node set. * @return node partition */ String getPartition(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java similarity index 73% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java index 405122bee35..57fd540845b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java @@ -20,15 +20,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -public class PlacementSetUtils { +/** + * Utilities for {@link CandidateNodeSet} + */ +public class CandidateNodeSetUtils { /* - * If the {@link PlacementSet} only has one entry, return it. otherwise + * If the {@link CandidateNodeSet} only has one entry, return it. otherwise * return null */ - public static N getSingleNode(PlacementSet ps) { + public static N getSingleNode( + CandidateNodeSet candidates) { N node = null; - if (1 == ps.getAllNodes().size()) { - node = ps.getAllNodes().values().iterator().next(); + if (1 == candidates.getAllNodes().size()) { + node = candidates.getAllNodes().values().iterator().next(); } return node; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java similarity index 95% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 6cc8cc72031..35351de4634 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -39,10 +39,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class LocalitySchedulingPlacementSet - implements SchedulingPlacementSet { +public class LocalityAppPlacementAllocator + implements AppPlacementAllocator { private static final Log LOG = - LogFactory.getLog(LocalitySchedulingPlacementSet.class); + LogFactory.getLog(LocalityAppPlacementAllocator.class); private final Map resourceRequestMap = new ConcurrentHashMap<>(); @@ -53,7 +53,7 @@ private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; - public LocalitySchedulingPlacementSet(AppSchedulingInfo info) { + public LocalityAppPlacementAllocator(AppSchedulingInfo info) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -63,11 +63,12 @@ public LocalitySchedulingPlacementSet(AppSchedulingInfo info) { @Override @SuppressWarnings("unchecked") public Iterator getPreferredNodeIterator( - PlacementSet clusterPlacementSet) { - // Now only handle the case that single node in placementSet - // TODO, Add support to multi-hosts inside placement-set which is passed in. + CandidateNodeSet candidateNodeSet) { + // Now only handle the case that single node in the candidateNodeSet + // TODO, Add support to multi-hosts inside candidateNodeSet which is passed + // in. - N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet); + N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); if (null != singleNode) { return IteratorUtils.singletonIterator(singleNode); } @@ -213,7 +214,7 @@ private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey, appSchedulingInfo.checkForDeactivation(); resourceRequestMap.remove(ResourceRequest.ANY); if (resourceRequestMap.isEmpty()) { - appSchedulingInfo.removePlacementSets(schedulerRequestKey); + appSchedulingInfo.removeAppPlacement(schedulerRequestKey); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java similarity index 80% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java index 48efaa14b27..31a21705d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java @@ -22,24 +22,22 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import java.util.Collections; -import java.util.Iterator; import java.util.Map; /** - * A simple PlacementSet which keeps an unordered map + * A simple CandidateNodeSet which keeps an unordered map */ -public class SimplePlacementSet - implements PlacementSet { +public class SimpleCandidateNodeSet + implements CandidateNodeSet { private Map map; private String partition; - public SimplePlacementSet(N node) { + public SimpleCandidateNodeSet(N node) { if (null != node) { - // Only one node in the initial PlacementSet + // Only one node in the initial CandidateNodeSet this.map = ImmutableMap.of(node.getNodeID(), node); this.partition = node.getPartition(); } else { @@ -48,7 +46,7 @@ public SimplePlacementSet(N node) { } } - public SimplePlacementSet(Map map, String partition) { + public SimpleCandidateNodeSet(Map map, String partition) { this.map = map; this.partition = partition; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1dea4eea75f..800d023183f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -140,7 +140,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -4199,7 +4199,8 @@ public void testSchedulingOnRemovedNode() throws Exception { scheduler.handle(new NodeRemovedSchedulerEvent( rm.getRMContext().getRMNodes().get(nm2.getNodeId()))); // schedulerNode is removed, try allocate a container - scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true); + scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node), + true); AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index e34665d2076..0fcc86d3928 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -147,9 +147,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue) - .assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); // Mock the node's resource availability Resource available = node.getUnallocatedResource(); @@ -159,9 +159,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } - }). - when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + }).when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); doNothing().when(node).releaseContainer(any(ContainerId.class), anyBoolean()); } @@ -425,10 +425,10 @@ public void testSortedQueues() throws Exception { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(PlacementSet.class), any(ResourceLimits.class), + any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), any(ResourceLimits.class), + any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 541539d892f..eacbf6e891a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -60,7 +60,7 @@ .FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; @@ -88,13 +88,14 @@ @Override public CSAssignment allocateContainersToNode( - PlacementSet ps, boolean withNodeHeartbeat) { + CandidateNodeSet candidates, + boolean withNodeHeartbeat) { try { Thread.sleep(1000); } catch(InterruptedException e) { LOG.debug("Thread interrupted."); } - return super.allocateContainersToNode(ps, withNodeHeartbeat); + return super.allocateContainersToNode(candidates, withNodeHeartbeat); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef336629..71fddfc4a33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -656,7 +656,7 @@ private void checkNodePartitionOfRequestedPriority(AppSchedulingInfo info, if (key.getPriority().getPriority() == priority) { Assert.assertEquals("Expected partition is " + expectedPartition, expectedPartition, - info.getSchedulingPlacementSet(key) + info.getAppPlacementAllocator(key) .getPrimaryRequestedNodePartition()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index cdbbc519857..102466b3c98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import java.util.HashMap; @@ -54,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -181,7 +180,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)).when(queue) - .assignContainers(eq(clusterResource), any(PlacementSet.class), + .assignContainers(eq(clusterResource), any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); // Mock the node's resource availability @@ -192,7 +191,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } - }).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), + }).when(queue).assignContainers(eq(clusterResource), any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); } @@ -274,13 +273,13 @@ public void testSingleLevelQueues() throws Exception { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), + any(CandidateNodeSet.class), anyResourceLimits(), any(SchedulingMode.class)); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -293,10 +292,12 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -307,10 +308,12 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -325,10 +328,12 @@ public void testSingleLevelQueues() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, b); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -547,22 +552,25 @@ public void testMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, c, b); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - applyAllocationToQueue(clusterResource, 1*GB, a); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 1 * GB, a); root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - applyAllocationToQueue(clusterResource, 2*GB, root); + allocationOrder.verify(c).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 2 * GB, root); root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, b); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); @@ -586,24 +594,28 @@ public void testMultiLevelQueues() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, a2, a1, b, c); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, a); root.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, b); root.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(c).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -720,12 +732,14 @@ public void testOffSwitchScheduling() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -738,9 +752,11 @@ public void testOffSwitchScheduling() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -800,10 +816,12 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(b2, b3); - allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(b3).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -815,10 +833,12 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b3, b2); - allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b3).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(b2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource);