diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 082ec14..9f49880 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -46,9 +46,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -82,8 +82,8 @@ private final ConcurrentSkipListSet schedulerKeys = new ConcurrentSkipListSet<>(); - final Map> - schedulerKeyToPlacementSets = new ConcurrentHashMap<>(); + private final Map> + schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>(); private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -146,7 +146,7 @@ public boolean isPending() { */ private void clearRequests() { schedulerKeys.clear(); - schedulerKeyToPlacementSets.clear(); + schedulerKeyToAppPlacementAllocator.clear(); LOG.info("Application " + applicationId + " requests cleared"); } @@ -190,9 +190,9 @@ public boolean updateResourceRequests(List requests, dedupRequests.get(schedulerKey).put(request.getResourceName(), request); } - // Update scheduling placement set + // Update AppPlacementAllocator by dedup requests. offswitchResourcesUpdated = - addToPlacementSets( + addRequestToAppPlacement( recoverPreemptedRequestForAContainer, dedupRequests); return offswitchResourcesUpdated; @@ -201,11 +201,11 @@ public boolean updateResourceRequests(List requests, } } - public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) { - schedulerKeyToPlacementSets.remove(schedulerRequestKey); + public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) { + schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey); } - boolean addToPlacementSets( + boolean addRequestToAppPlacement( boolean recoverPreemptedRequestForAContainer, Map> dedupRequests) { boolean offswitchResourcesUpdated = false; @@ -213,14 +213,15 @@ boolean addToPlacementSets( dedupRequests.entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); - if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) { - schedulerKeyToPlacementSets.put(schedulerRequestKey, - new LocalitySchedulingPlacementSet<>(this)); + if (!schedulerKeyToAppPlacementAllocator.containsKey( + schedulerRequestKey)) { + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, + new LocalityAppPlacementAllocator<>(this)); } - // Update placement set + // Update AppPlacementAllocator ResourceRequestUpdateResult pendingAmountChanges = - schedulerKeyToPlacementSets.get(schedulerRequestKey) + schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) .updateResourceRequests( entry.getValue().values(), recoverPreemptedRequestForAContainer); @@ -244,7 +245,7 @@ private void updatePendingResources(ResourceRequest lastRequest, if (request.getNumContainers() <= 0) { if (lastRequestContainers >= 0) { schedulerKeys.remove(schedulerKey); - schedulerKeyToPlacementSets.remove(schedulerKey); + schedulerKeyToAppPlacementAllocator.remove(schedulerKey); } LOG.info("checking for deactivate of application :" + this.applicationId); @@ -356,8 +357,9 @@ public boolean getAndResetBlacklistChanged() { List ret = new ArrayList<>(); try { this.readLock.lock(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - ret.addAll(ps.getResourceRequests().values()); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + ret.addAll(ap.getResourceRequests().values()); } } finally { this.readLock.unlock(); @@ -384,8 +386,9 @@ public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey, String resourceName) { try { this.readLock.lock(); - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); - return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName); + AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( + schedulerKey); + return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName); } finally { this.readLock.unlock(); } @@ -424,7 +427,7 @@ public boolean isPlaceBlacklisted(String resourceName, updateMetricsForAllocatedContainer(type, node, containerAllocated); } - return schedulerKeyToPlacementSets.get(schedulerKey).allocate( + return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate( schedulerKey, type, node); } finally { writeLock.unlock(); @@ -442,23 +445,24 @@ public void move(Queue newQueue) { this.writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { oldMetrics.decrPendingResources( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); newMetrics.incrPendingResources( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); Resource delta = Resources.multiply(ask.getPerAllocationResource(), ask.getCount()); // Update Queue queue.decPendingResource( - ps.getPrimaryRequestedNodePartition(), delta); + ap.getPrimaryRequestedNodePartition(), delta); newQueue.incPendingResource( - ps.getPrimaryRequestedNodePartition(), delta); + ap.getPrimaryRequestedNodePartition(), delta); } } oldMetrics.moveAppFrom(this); @@ -477,15 +481,16 @@ public void stop() { try { this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { - metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(), + metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); // Update Queue queue.decPendingResource( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), Resources.multiply(ask.getPerAllocationResource(), ask.getCount())); } @@ -559,11 +564,12 @@ public boolean checkAllocation(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey) { try { readLock.lock(); - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); - if (null == ps) { + AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( + schedulerKey); + if (null == ap) { return false; } - return ps.canAllocate(type, node); + return ap.canAllocate(type, node); } finally { readLock.unlock(); } @@ -593,11 +599,10 @@ private void updateMetricsForAllocatedContainer(NodeType type, metrics.incrNodeTypeAggregations(user, type); } - // Get placement-set by specified schedulerKey - // Now simply return all node of the input clusterPlacementSet - public SchedulingPlacementSet getSchedulingPlacementSet( + // Get AppPlacementAllocator by specified schedulerKey + public AppPlacementAllocator getAppPlacementAllocator( SchedulerRequestKey schedulerkey) { - return (SchedulingPlacementSet) schedulerKeyToPlacementSets.get( + return (AppPlacementAllocator) schedulerKeyToAppPlacementAllocator.get( schedulerkey); } @@ -614,9 +619,9 @@ public boolean canDelayTo( SchedulerRequestKey schedulerKey, String resourceName) { try { this.readLock.lock(); - SchedulingPlacementSet ps = - schedulerKeyToPlacementSets.get(schedulerKey); - return (ps == null) || ps.canDelayTo(resourceName); + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap == null) || ap.canDelayTo(resourceName); } finally { this.readLock.unlock(); } @@ -626,9 +631,9 @@ public boolean acceptNodePartition(SchedulerRequestKey schedulerKey, String nodePartition, SchedulingMode schedulingMode) { try { this.readLock.lock(); - SchedulingPlacementSet ps = - schedulerKeyToPlacementSets.get(schedulerKey); - return (ps != null) && ps.acceptNodePartition(nodePartition, + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap != null) && ap.acceptNodePartition(nodePartition, schedulingMode); } finally { this.readLock.unlock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 5ac2ac5..93995a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -34,8 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement - .SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; @@ -146,17 +145,17 @@ public synchronized boolean checkAndAddToOutstandingIncreases( createResourceRequests(rmContainer, schedulerNode, schedulerKey, resToIncrease); updateResReqs.put(schedulerKey, resMap); - appSchedulingInfo.addToPlacementSets(false, updateResReqs); + appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs); } return true; } private void cancelPreviousRequest(SchedulerNode schedulerNode, SchedulerRequestKey schedulerKey) { - SchedulingPlacementSet schedulingPlacementSet = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); - if (schedulingPlacementSet != null) { - Map resourceRequests = schedulingPlacementSet + AppPlacementAllocator appPlacementAllocator = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); + if (appPlacementAllocator != null) { + Map resourceRequests = appPlacementAllocator .getResourceRequests(); ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY); // Decrement the pending using a dummy RR with @@ -290,7 +289,7 @@ public ContainerId matchContainerToOutstandingIncreaseReq( (rmContainer, node, schedulerKey, rmContainer.getContainer().getResource()); reqsToUpdate.put(schedulerKey, resMap); - appSchedulingInfo.addToPlacementSets(true, reqsToUpdate); + appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate); return UNDEFINED; } return retVal; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ce71afa..346bd20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.collect.ConcurrentHashMultiset; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.FastDateFormat; @@ -75,14 +74,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; - import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -91,6 +87,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ConcurrentHashMultiset; /** * Represents an application attempt from the viewpoint of the scheduler. @@ -316,9 +313,9 @@ public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey, String resourceName) { try { readLock.lock(); - SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet( + AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator( schedulerKey); - return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName); + return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName); } finally { readLock.unlock(); } @@ -617,13 +614,13 @@ public void showRequests() { try { readLock.lock(); for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { - SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey); - if (ps != null && - ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) { + AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey); + if (ap != null && + ap.getOutstandingAsksCount(ResourceRequest.ANY) > 0) { LOG.debug("showRequests:" + " application=" + getApplicationId() + " headRoom=" + getHeadroom() + " currentConsumption=" + attemptResourceUsage.getUsed().getMemorySize()); - ps.showRequests(); + ap.showRequests(); } } } finally { @@ -1334,14 +1331,14 @@ protected void setAttemptRecovering(boolean isRecovering) { this.isAttemptRecovering = isRecovering; } - public SchedulingPlacementSet getSchedulingPlacementSet( + public AppPlacementAllocator getAppPlacementAllocator( SchedulerRequestKey schedulerRequestKey) { - return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey); + return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey); } public Map getResourceRequests( SchedulerRequestKey schedulerRequestKey) { - return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey) + return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey) .getResourceRequests(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 12aff02..0c351b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -32,7 +32,7 @@ /** * Utility for logging scheduler activities */ -// FIXME: make sure PlacementSet works with this class +// FIXME: make sure CandidateNodeSet works with this class public class ActivitiesLogger { private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 250f4e6..183cb36 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -876,7 +876,7 @@ public Resource getTotalKillableResource(String partition) { public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { - return assignContainers(clusterResource, new SimplePlacementSet<>(node), + return assignContainers(clusterResource, new SimpleCandidateNodeSet<>(node), resourceLimits, schedulingMode); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 3a17d1b..43e7f53 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -46,12 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; /** * CSQueue represents a node in the tree of @@ -188,15 +185,16 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, /** * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. - * @param ps {@link PlacementSet} of nodes which resources are available + * @param candidates {@link CandidateNodeSet} the nodes that are considered + * for the current placement. * @param resourceLimits how much overall resource of this queue can use. * @param schedulingMode Type of exclusive check when assign container on a * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits resourceLimits, - SchedulingMode schedulingMode); + CandidateNodeSet candidates, + ResourceLimits resourceLimits, SchedulingMode schedulingMode); /** * A container assigned to the queue has completed. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index db69042..9bdc6be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -132,9 +132,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; @@ -1183,7 +1183,7 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, /** * We need to make sure when doing allocation, Node should be existed - * And we will construct a {@link PlacementSet} before proceeding + * And we will construct a {@link CandidateNodeSet} before proceeding */ private void allocateContainersToNode(NodeId nodeId, boolean withNodeHeartbeat) { @@ -1192,8 +1192,10 @@ private void allocateContainersToNode(NodeId nodeId, int offswitchCount = 0; int assignedContainers = 0; - PlacementSet ps = new SimplePlacementSet<>(node); - CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat); + CandidateNodeSet candidates = + new SimpleCandidateNodeSet<>(node); + CSAssignment assignment = allocateContainersToNode(candidates, + withNodeHeartbeat); // Only check if we can allocate more container on the same node when // scheduling is triggered by node heartbeat if (null != assignment && withNodeHeartbeat) { @@ -1210,7 +1212,7 @@ private void allocateContainersToNode(NodeId nodeId, assignedContainers)) { // Try to see if it is possible to allocate multiple container for // the same node heartbeat - assignment = allocateContainersToNode(ps, true); + assignment = allocateContainersToNode(candidates, true); if (null != assignment && assignment.getType() == NodeType.OFF_SWITCH) { @@ -1237,8 +1239,9 @@ private void allocateContainersToNode(NodeId nodeId, /* * Logics of allocate container on a single node (Old behavior) */ - private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, - FiCaSchedulerNode node, boolean withNodeHeartbeat) { + private CSAssignment allocateContainerOnSingleNode( + CandidateNodeSet candidates, FiCaSchedulerNode node, + boolean withNodeHeartbeat) { // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. if (getNode(node.getNodeID()) != node) { @@ -1262,7 +1265,7 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, boolean withNodeHeartbeat) { + CandidateNodeSet candidates, + boolean withNodeHeartbeat) { CSAssignment assignment = getRootQueue().assignContainers( - getClusterResource(), ps, new ResourceLimits(labelManager - .getResourceByLabel(ps.getPartition(), getClusterResource())), + getClusterResource(), candidates, new ResourceLimits(labelManager + .getResourceByLabel(candidates.getPartition(), + getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1346,30 +1351,34 @@ private CSAssignment allocateOrReserveNewContainers( assignment.getResource(), Resources.none())) { if (withNodeHeartbeat) { updateSchedulerHealth(lastNodeUpdateTime, - PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment); + CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(), + assignment); } return assignment; } // Only do non-exclusive allocation when node has node-labels. - if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) { + if (StringUtils.equals(candidates.getPartition(), + RMNodeLabelsManager.NO_LABEL)) { return null; } // Only do non-exclusive allocation when the node-label supports that try { if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( - ps.getPartition())) { + candidates.getPartition())) { return null; } } catch (IOException e) { - LOG.warn("Exception when trying to get exclusivity of node label=" + ps + LOG.warn( + "Exception when trying to get exclusivity of node label=" + candidates .getPartition(), e); return null; } // Try to use NON_EXCLUSIVE - assignment = getRootQueue().assignContainers(getClusterResource(), ps, + assignment = getRootQueue().assignContainers(getClusterResource(), + candidates, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager @@ -1386,13 +1395,14 @@ private CSAssignment allocateOrReserveNewContainers( * New behavior, allocate containers considering multiple nodes */ private CSAssignment allocateContainersOnMultiNodes( - PlacementSet ps) { + CandidateNodeSet candidates) { // When this time look at multiple nodes, try schedule if the // partition has any available resource or killable resource if (getRootQueue().getQueueCapacities().getUsedCapacity( - ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource( - CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources - .none()) { + candidates.getPartition()) >= 1.0f + && preemptionManager.getKillableResource( + CapacitySchedulerConfiguration.ROOT, candidates.getPartition()) + == Resources.none()) { if (LOG.isDebugEnabled()) { LOG.debug("This node or this node partition doesn't have available or" + "killable resource"); @@ -1400,11 +1410,12 @@ private CSAssignment allocateContainersOnMultiNodes( return null; } - return allocateOrReserveNewContainers(ps, false); + return allocateOrReserveNewContainers(candidates, false); } @VisibleForTesting - CSAssignment allocateContainersToNode(PlacementSet ps, + CSAssignment allocateContainersToNode( + CandidateNodeSet candidates, boolean withNodeHeartbeat) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext .isSchedulerReadyForAllocatingContainers()) { @@ -1413,14 +1424,14 @@ CSAssignment allocateContainersToNode(PlacementSet ps, // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // We have two different logics to handle allocation on single node / multi // nodes. if (null != node) { - return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat); - } else { - return allocateContainersOnMultiNodes(ps); + return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + } else{ + return allocateContainersOnMultiNodes(candidates); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f2f1baf..e8342d9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -69,8 +69,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; @@ -970,10 +970,10 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } - private CSAssignment allocateFromReservedContainer( - Resource clusterResource, PlacementSet ps, + private CSAssignment allocateFromReservedContainer(Resource clusterResource, + CandidateNodeSet candidates, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (null == node) { return null; } @@ -987,7 +987,8 @@ private CSAssignment allocateFromReservedContainer( ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node.getNodeID(), SystemClock.getInstance().getTime(), application); CSAssignment assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, reservedContainer); + candidates, currentResourceLimits, schedulingMode, + reservedContainer); return assignment; } } @@ -997,43 +998,44 @@ private CSAssignment allocateFromReservedContainer( @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { + CandidateNodeSet candidates, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: partition=" + ps.getPartition() + LOG.debug("assignContainers: partition=" + candidates.getPartition() + " #applications=" + orderingPolicy.getNumSchedulableEntities()); } - setPreemptionAllowed(currentResourceLimits, ps.getPartition()); + setPreemptionAllowed(currentResourceLimits, candidates.getPartition()); // Check for reserved resources, try to allocate reserved container first. CSAssignment assignment = allocateFromReservedContainer(clusterResource, - ps, currentResourceLimits, schedulingMode); + candidates, currentResourceLimits, schedulingMode); if (null != assignment) { return assignment; } // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(ps.getPartition())) { + && !accessibleToPartition(candidates.getPartition())) { ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + candidates .getPartition()); return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!hasPendingResourceRequest(ps.getPartition(), clusterResource, + if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + ps.getPartition()); + + schedulingMode.name() + " node-partition=" + candidates + .getPartition()); } ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, @@ -1078,7 +1080,8 @@ public CSAssignment assignContainers(Resource clusterResource, cachedUserLimit = cul.userLimit; } Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit); + clusterResource, candidates.getPartition(), schedulingMode, + cachedUserLimit); if (cul == null) { cul = new CachedUserLimit(userLimit); userLimits.put(application.getUser(), cul); @@ -1106,7 +1109,7 @@ public CSAssignment assignContainers(Resource clusterResource, // Try to schedule assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, null); + candidates, currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " + application diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 2c288f2..d61951b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.util.resource.Resources; @Private @@ -479,16 +479,16 @@ private String getParentName() { @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits resourceLimits, - SchedulingMode schedulingMode) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + CandidateNodeSet candidates, + ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(ps.getPartition())) { + && !accessibleToPartition(candidates.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + ps + + ", because it is not able to access partition=" + candidates .getPartition()); } @@ -506,12 +506,12 @@ public CSAssignment assignContainers(Resource clusterResource, // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource, - schedulingMode)) { + if (!super.hasPendingResourceRequest(candidates.getPartition(), + clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + ps + + schedulingMode.name() + " node-partition=" + candidates .getPartition()); } @@ -538,7 +538,8 @@ public CSAssignment assignContainers(Resource clusterResource, // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, + candidates.getPartition(), resourceLimits, Resources .createResource(getMetrics().getReservedMB(), getMetrics().getReservedVirtualCores()), schedulingMode)) { @@ -556,7 +557,7 @@ public CSAssignment assignContainers(Resource clusterResource, // Schedule CSAssignment assignedToChild = assignContainersToChildQueues( - clusterResource, ps, resourceLimits, schedulingMode); + clusterResource, candidates, resourceLimits, schedulingMode); assignment.setType(assignedToChild.getType()); assignment.setRequestLocalityType( assignedToChild.getRequestLocalityType()); @@ -710,7 +711,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, } private CSAssignment assignContainersToChildQueues(Resource cluster, - PlacementSet ps, ResourceLimits limits, + CandidateNodeSet candidates, ResourceLimits limits, SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; @@ -719,7 +720,7 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, // Try to assign to most 'under-served' sub-queue for (Iterator iter = sortAndGetChildrenAllocationIterator( - ps.getPartition()); iter.hasNext(); ) { + candidates.getPartition()); iter.hasNext(); ) { CSQueue childQueue = iter.next(); if(LOG.isDebugEnabled()) { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() @@ -729,10 +730,10 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, parentLimits, - ps.getPartition()); - - CSAssignment childAssignment = childQueue.assignContainers(cluster, ps, - childLimits, schedulingMode); + candidates.getPartition()); + + CSAssignment childAssignment = childQueue.assignContainers(cluster, + candidates, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index 5809d86..95e0533 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -20,7 +20,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -34,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -176,13 +175,14 @@ protected CSAssignment getCSAssignmentFromAllocateResult( * * * @param clusterResource clusterResource - * @param ps PlacementSet + * @param candidates CandidateNodeSet * @param schedulingMode scheduling mode (exclusive or nonexclusive) * @param resourceLimits resourceLimits * @param reservedContainer reservedContainer * @return CSAssignemnt proposal */ public abstract CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer); + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + RMContainer reservedContainer); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 4879fae..9df03b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -21,16 +21,14 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerAllocator extends AbstractContainerAllocator { private AbstractContainerAllocator regularContainerAllocator; @@ -50,10 +48,11 @@ public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer) { + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + RMContainer reservedContainer) { return regularContainerAllocator.assignContainers(clusterResource, - ps, schedulingMode, resourceLimits, reservedContainer); + candidates, schedulingMode, resourceLimits, reservedContainer); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 72dfbdd..69e90c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -50,9 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -91,15 +91,16 @@ private boolean checkHeadroom(Resource clusterResource, /* * Pre-check if we can allocate a pending resource request - * (given schedulerKey) to a given PlacementSet. + * (given schedulerKey) to a given CandidateNodeSet. * We will consider stuffs like exclusivity, pending resource, node partition, * headroom, etc. */ - private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { + private ContainerAllocation preCheckForNodeCandidateSet( + Resource clusterResource, CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + SchedulerRequestKey schedulerKey) { Priority priority = schedulerKey.getPriority(); - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey, ResourceRequest.ANY); @@ -164,7 +165,7 @@ private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, } if (!checkHeadroom(clusterResource, resourceLimits, required, - ps.getPartition())) { + candidates.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); @@ -182,7 +183,7 @@ private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, // Only do this when request associated with given scheduler key accepts // NO_LABEL under RESPECT_EXCLUSIVITY mode if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, - appInfo.getSchedulingPlacementSet(schedulerKey) + appInfo.getAppPlacementAllocator(schedulerKey) .getPrimaryRequestedNodePartition())) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( @@ -265,7 +266,7 @@ public float getLocalityWaitFactor( SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = Math.max( - application.getSchedulingPlacementSet(schedulerKey) + application.getAppPlacementAllocator(schedulerKey) .getUniqueLocationAsks() - 1, 0); // waitFactor can't be more than '1' @@ -780,15 +781,15 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, } private ContainerAllocation allocate(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, - RMContainer reservedContainer) { + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { // Do checks before determining which node to allocate // Directly return if this check fails. ContainerAllocation result; if (reservedContainer == null) { - result = preCheckForPlacementSet(clusterResource, ps, schedulingMode, - resourceLimits, schedulerKey); + result = preCheckForNodeCandidateSet(clusterResource, candidates, + schedulingMode, resourceLimits, schedulerKey); if (null != result) { return result; } @@ -801,14 +802,14 @@ private ContainerAllocation allocate(Resource clusterResource, } } - SchedulingPlacementSet schedulingPS = - application.getAppSchedulingInfo().getSchedulingPlacementSet( + AppPlacementAllocator schedulingPS = + application.getAppSchedulingInfo().getAppPlacementAllocator( schedulerKey); result = ContainerAllocation.PRIORITY_SKIPPED; Iterator iter = schedulingPS.getPreferredNodeIterator( - ps); + candidates); while (iter.hasNext()) { FiCaSchedulerNode node = iter.next(); @@ -827,19 +828,20 @@ private ContainerAllocation allocate(Resource clusterResource, @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, + CandidateNodeSet candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. if (!application.hasPendingResourceRequest(rc, - ps.getPartition(), clusterResource, schedulingMode)) { + candidates.getPartition(), clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + ps.getPartition()); + + schedulingMode.name() + " node-label=" + candidates + .getPartition()); } ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, application.getPriority(), @@ -849,9 +851,8 @@ public CSAssignment assignContainers(Resource clusterResource, // Schedule in priority order for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { - ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, - schedulerKey, null); + ContainerAllocation result = allocate(clusterResource, candidates, + schedulingMode, resourceLimits, schedulerKey, null); AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { @@ -869,7 +870,7 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, + allocate(clusterResource, candidates, schedulingMode, resourceLimits, reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer, node); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a12c5ec..40405fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -76,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -224,10 +224,10 @@ public RMContainer allocate(FiCaSchedulerNode node, return null; } - SchedulingPlacementSet ps = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + AppPlacementAllocator ps = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); if (null == ps) { - LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName() + LOG.warn("Failed to get " + AppPlacementAllocator.class.getName() + " for application=" + getApplicationId() + " schedulerRequestKey=" + schedulerKey); return null; @@ -636,8 +636,8 @@ private boolean internalUnreserve(FiCaSchedulerNode node, Map ret = new HashMap<>(); for (SchedulerRequestKey schedulerKey : appSchedulingInfo .getSchedulerKeys()) { - SchedulingPlacementSet ps = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + AppPlacementAllocator ps = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); String nodePartition = ps.getPrimaryRequestedNodePartition(); Resource res = ret.get(nodePartition); @@ -844,8 +844,9 @@ public LeafQueue getCSLeafQueue() { } public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode, RMContainer reservedContainer) { + CandidateNodeSet ps, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, + RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + getApplicationId()); @@ -962,9 +963,9 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { @Override @SuppressWarnings("unchecked") - public SchedulingPlacementSet getSchedulingPlacementSet( + public AppPlacementAllocator getAppPlacementAllocator( SchedulerRequestKey schedulerRequestKey) { - return super.getSchedulingPlacementSet(schedulerRequestKey); + return super.getAppPlacementAllocator(schedulerRequestKey); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 157d264..bbd4418 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -1019,7 +1019,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { } if (offswitchAsk.getCount() > 0) { - if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks() + if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks() <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) { if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java new file mode 100644 index 0000000..63b22a3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + *

+ * This class has the following functionality: + * 1) Keeps track of pending resource requests when following events happen: + * - New ResourceRequests are added to scheduler. + * - New containers get allocated. + * + * 2) Determines the order that the nodes given in the {@link CandidateNodeSet} + * will be used for allocating containers. + *

+ * + *

+ * And different set of resource requests (E.g., resource requests with the + * same schedulerKey) can have one instance of AppPlacementAllocator, each + * AppPlacementAllocator can have different ways to order nodes depends on + * requests. + *

+ */ +public interface AppPlacementAllocator { + /** + * Get iterator of preferred node depends on requirement and/or availability + * @param candidateNodeSet input CandidateNodeSet + * @return iterator of preferred node + */ + Iterator getPreferredNodeIterator(CandidateNodeSet candidateNodeSet); + + /** + * Replace existing ResourceRequest by the new requests + * + * @param requests new ResourceRequests + * @param recoverPreemptedRequestForAContainer if we're recovering resource + * requests for preempted container + * @return true if total pending resource changed + */ + ResourceRequestUpdateResult updateResourceRequests( + Collection requests, + boolean recoverPreemptedRequestForAContainer); + + /** + * Get pending ResourceRequests by given schedulerRequestKey + * @return Map of resourceName to ResourceRequest + */ + Map getResourceRequests(); + + /** + * Get pending ask for given resourceName. If there's no such pendingAsk, + * returns {@link PendingAsk#ZERO} + * + * @param resourceName resourceName + * @return PendingAsk + */ + PendingAsk getPendingAsk(String resourceName); + + /** + * Get #pending-allocations for given resourceName. If there's no such + * pendingAsk, returns 0 + * + * @param resourceName resourceName + * @return #pending-allocations + */ + int getOutstandingAsksCount(String resourceName); + + /** + * Notify container allocated. + * @param schedulerKey SchedulerRequestKey for this ResourceRequest + * @param type Type of the allocation + * @param node Which node this container allocated on + * @return list of ResourceRequests deducted + */ + List allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node); + + /** + * Returns list of accepted resourceNames. + * @return Iterator of accepted resourceNames + */ + Iterator getAcceptedResouceNames(); + + /** + * We can still have pending requirement for a given NodeType and node + * @param type Locality Type + * @param node which node we will allocate on + * @return true if we has pending requirement + */ + boolean canAllocate(NodeType type, SchedulerNode node); + + /** + * Can delay to give locality? + * TODO: This should be moved out of AppPlacementAllocator + * and should belong to specific delay scheduling policy impl. + * See YARN-7457 for more details. + * + * @param resourceName resourceName + * @return can/cannot + */ + boolean canDelayTo(String resourceName); + + /** + * Does this {@link AppPlacementAllocator} accept resources on nodePartition? + * + * @param nodePartition nodePartition + * @param schedulingMode schedulingMode + * @return accepted/not + */ + boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode); + + /** + * It is possible that one request can accept multiple node partition, + * So this method returns primary node partition for pending resource / + * headroom calculation. + * + * @return primary requested node partition + */ + String getPrimaryRequestedNodePartition(); + + /** + * @return number of unique location asks with #pending greater than 0, + * (like /rack1, host1, etc.). + * + * TODO: This should be moved out of AppPlacementAllocator + * and should belong to specific delay scheduling policy impl. + * See YARN-7457 for more details. + */ + int getUniqueLocationAsks(); + + /** + * Print human-readable requests to LOG debug. + */ + void showRequests(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java new file mode 100644 index 0000000..6f127c9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Map; + +/** + * A group of nodes which can be allocated by scheduler. + * + * It will have following part: + * + * 1) A map of nodes which can be schedule-able. + * 2) Version of the node set, version should be updated if any node added or + * removed from the node set. This will be used by + * {@link AppPlacementAllocator} or other class to check if it's required to + * invalidate local caches, etc. + * 3) Node partition of the candidate set. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface CandidateNodeSet { + /** + * Get all nodes for this CandidateNodeSet + * @return all nodes for this CandidateNodeSet + */ + Map getAllNodes(); + + /** + * Version of the CandidateNodeSet, can help {@link AppPlacementAllocator} to + * decide if update is required + * @return version + */ + long getVersion(); + + /** + * Node partition of the node set. + * @return node partition + */ + String getPartition(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java new file mode 100644 index 0000000..940ebf4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Utility methods for {@link CandidateNodeSet}. + */ +public class CandidateNodeSetUtils { + + private CandidateNodeSetUtils() { + } + + /* + * If the {@link CandidateNodeSet} only has one entry, return it. Otherwise, + * return null. + */ + public static N getSingleNode( + CandidateNodeSet candidates) { + N node = null; + if (1 == candidates.getAllNodes().size()) { + node = candidates.getAllNodes().values().iterator().next(); + } + + return node; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java new file mode 100644 index 0000000..7f89435 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is an implementation of the {@link AppPlacementAllocator} that takes + * into account locality preferences (node, rack, any) when allocating + * containers. + */ +public class LocalityAppPlacementAllocator + implements AppPlacementAllocator { + private static final Log LOG = + LogFactory.getLog(LocalityAppPlacementAllocator.class); + + private final Map resourceRequestMap = + new ConcurrentHashMap<>(); + private AppSchedulingInfo appSchedulingInfo; + private volatile String primaryRequestedPartition = + RMNodeLabelsManager.NO_LABEL; + + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + + public LocalityAppPlacementAllocator(AppSchedulingInfo info) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + this.appSchedulingInfo = info; + } + + @Override + @SuppressWarnings("unchecked") + public Iterator getPreferredNodeIterator( + CandidateNodeSet candidateNodeSet) { + // Now only handle the case that single node in the candidateNodeSet + // TODO, Add support to multi-hosts inside candidateNodeSet which is passed + // in. + + N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); + if (null != singleNode) { + return IteratorUtils.singletonIterator(singleNode); + } + + return IteratorUtils.emptyIterator(); + } + + private boolean hasRequestLabelChanged(ResourceRequest requestOne, + ResourceRequest requestTwo) { + String requestOneLabelExp = requestOne.getNodeLabelExpression(); + String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); + // First request label expression can be null and second request + // is not null then we have to consider it as changed. + if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) { + return true; + } + // If the label is not matching between both request when + // requestOneLabelExp is not null. + return ((null != requestOneLabelExp) && !(requestOneLabelExp + .equals(requestTwoLabelExp))); + } + + private void updateNodeLabels(ResourceRequest request) { + String resourceName = request.getResourceName(); + if (resourceName.equals(ResourceRequest.ANY)) { + ResourceRequest previousAnyRequest = + getResourceRequest(resourceName); + + // When there is change in ANY request label expression, we should + // update label for all resource requests already added of same + // priority as ANY resource request. + if ((null == previousAnyRequest) || hasRequestLabelChanged( + previousAnyRequest, request)) { + for (ResourceRequest r : resourceRequestMap.values()) { + if (!r.getResourceName().equals(ResourceRequest.ANY)) { + r.setNodeLabelExpression(request.getNodeLabelExpression()); + } + } + } + } else{ + ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY); + if (anyRequest != null) { + request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); + } + } + } + + @Override + public ResourceRequestUpdateResult updateResourceRequests( + Collection requests, + boolean recoverPreemptedRequestForAContainer) { + try { + this.writeLock.lock(); + + ResourceRequestUpdateResult updateResult = null; + + // Update resource requests + for (ResourceRequest request : requests) { + String resourceName = request.getResourceName(); + + // Update node labels if required + updateNodeLabels(request); + + // Increment number of containers if recovering preempted resources + ResourceRequest lastRequest = resourceRequestMap.get(resourceName); + if (recoverPreemptedRequestForAContainer && lastRequest != null) { + request.setNumContainers(lastRequest.getNumContainers() + 1); + } + + // Update asks + resourceRequestMap.put(resourceName, request); + + if (resourceName.equals(ResourceRequest.ANY)) { + String partition = request.getNodeLabelExpression() == null ? + RMNodeLabelsManager.NO_LABEL : + request.getNodeLabelExpression(); + + this.primaryRequestedPartition = partition; + + //update the applications requested labels set + appSchedulingInfo.addRequestedPartition(partition); + + updateResult = new ResourceRequestUpdateResult(lastRequest, request); + } + } + return updateResult; + } finally { + this.writeLock.unlock(); + } + } + + @Override + public Map getResourceRequests() { + return resourceRequestMap; + } + + private ResourceRequest getResourceRequest(String resourceName) { + return resourceRequestMap.get(resourceName); + } + + @Override + public PendingAsk getPendingAsk(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + if (null == request) { + return PendingAsk.ZERO; + } else{ + return new PendingAsk(request.getCapability(), + request.getNumContainers()); + } + } finally { + readLock.unlock(); + } + + } + + @Override + public int getOutstandingAsksCount(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + if (null == request) { + return 0; + } else{ + return request.getNumContainers(); + } + } finally { + readLock.unlock(); + } + + } + + private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey, + ResourceRequest offSwitchRequest) { + int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; + offSwitchRequest.setNumContainers(numOffSwitchContainers); + + // Do we have any outstanding requests? + // If there is nothing, we need to deactivate this application + if (numOffSwitchContainers == 0) { + appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey); + appSchedulingInfo.checkForDeactivation(); + resourceRequestMap.remove(ResourceRequest.ANY); + if (resourceRequestMap.isEmpty()) { + appSchedulingInfo.removeAppPlacement(schedulerRequestKey); + } + } + + appSchedulingInfo.decPendingResource( + offSwitchRequest.getNodeLabelExpression(), + offSwitchRequest.getCapability()); + } + + public ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = ResourceRequest.newBuilder() + .priority(request.getPriority()) + .allocationRequestId(request.getAllocationRequestId()) + .resourceName(request.getResourceName()) + .capability(request.getCapability()) + .numContainers(1) + .relaxLocality(request.getRelaxLocality()) + .nodeLabelExpression(request.getNodeLabelExpression()).build(); + return newRequest; + } + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateRackLocal(SchedulerRequestKey schedulerKey, + SchedulerNode node, ResourceRequest rackLocalRequest, + List resourceRequests) { + // Update future requirements + decResourceRequest(node.getRackName(), rackLocalRequest); + + ResourceRequest offRackRequest = resourceRequestMap.get( + ResourceRequest.ANY); + decrementOutstanding(schedulerKey, offRackRequest); + + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); + } + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateOffSwitch(SchedulerRequestKey schedulerKey, + ResourceRequest offSwitchRequest, + List resourceRequests) { + // Update future requirements + decrementOutstanding(schedulerKey, offSwitchRequest); + // Update cloned OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(offSwitchRequest)); + } + + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateNodeLocal(SchedulerRequestKey schedulerKey, + SchedulerNode node, ResourceRequest nodeLocalRequest, + List resourceRequests) { + // Update future requirements + decResourceRequest(node.getNodeName(), nodeLocalRequest); + + ResourceRequest rackLocalRequest = resourceRequestMap.get( + node.getRackName()); + decResourceRequest(node.getRackName(), rackLocalRequest); + + ResourceRequest offRackRequest = resourceRequestMap.get( + ResourceRequest.ANY); + decrementOutstanding(schedulerKey, offRackRequest); + + // Update cloned NodeLocal, RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); + } + + private void decResourceRequest(String resourceName, + ResourceRequest request) { + request.setNumContainers(request.getNumContainers() - 1); + if (request.getNumContainers() == 0) { + resourceRequestMap.remove(resourceName); + } + } + + @Override + public boolean canAllocate(NodeType type, SchedulerNode node) { + try { + readLock.lock(); + ResourceRequest r = resourceRequestMap.get( + ResourceRequest.ANY); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(node.getRackName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(node.getNodeName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + } + } + + return true; + } finally { + readLock.unlock(); + } + } + + @Override + public boolean canDelayTo(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + return request == null || request.getRelaxLocality(); + } finally { + readLock.unlock(); + } + + } + + @Override + public boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode) { + // We will only look at node label = nodeLabelToLookAt according to + // schedulingMode and partition of node. + String nodePartitionToLookAt; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + nodePartitionToLookAt = nodePartition; + } else { + nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + + return primaryRequestedPartition.equals(nodePartitionToLookAt); + } + + @Override + public String getPrimaryRequestedNodePartition() { + return primaryRequestedPartition; + } + + @Override + public int getUniqueLocationAsks() { + return resourceRequestMap.size(); + } + + @Override + public void showRequests() { + for (ResourceRequest request : resourceRequestMap.values()) { + if (request.getNumContainers() > 0) { + LOG.debug("\tRequest=" + request); + } + } + } + + @Override + public List allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node) { + try { + writeLock.lock(); + + List resourceRequests = new ArrayList<>(); + + ResourceRequest request; + if (type == NodeType.NODE_LOCAL) { + request = resourceRequestMap.get(node.getNodeName()); + } else if (type == NodeType.RACK_LOCAL) { + request = resourceRequestMap.get(node.getRackName()); + } else{ + request = resourceRequestMap.get(ResourceRequest.ANY); + } + + if (type == NodeType.NODE_LOCAL) { + allocateNodeLocal(schedulerKey, node, request, resourceRequests); + } else if (type == NodeType.RACK_LOCAL) { + allocateRackLocal(schedulerKey, node, request, resourceRequests); + } else{ + allocateOffSwitch(schedulerKey, request, resourceRequests); + } + + return resourceRequests; + } finally { + writeLock.unlock(); + } + } + + @Override + public Iterator getAcceptedResouceNames() { + try { + readLock.lock(); + return resourceRequestMap.keySet().iterator(); + } finally { + readLock.unlock(); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java deleted file mode 100644 index 6cc8cc7..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java +++ /dev/null @@ -1,416 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.commons.collections.IteratorUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class LocalitySchedulingPlacementSet - implements SchedulingPlacementSet { - private static final Log LOG = - LogFactory.getLog(LocalitySchedulingPlacementSet.class); - - private final Map resourceRequestMap = - new ConcurrentHashMap<>(); - private AppSchedulingInfo appSchedulingInfo; - private volatile String primaryRequestedPartition = - RMNodeLabelsManager.NO_LABEL; - - private final ReentrantReadWriteLock.ReadLock readLock; - private final ReentrantReadWriteLock.WriteLock writeLock; - - public LocalitySchedulingPlacementSet(AppSchedulingInfo info) { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - this.appSchedulingInfo = info; - } - - @Override - @SuppressWarnings("unchecked") - public Iterator getPreferredNodeIterator( - PlacementSet clusterPlacementSet) { - // Now only handle the case that single node in placementSet - // TODO, Add support to multi-hosts inside placement-set which is passed in. - - N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet); - if (null != singleNode) { - return IteratorUtils.singletonIterator(singleNode); - } - - return IteratorUtils.emptyIterator(); - } - - private boolean hasRequestLabelChanged(ResourceRequest requestOne, - ResourceRequest requestTwo) { - String requestOneLabelExp = requestOne.getNodeLabelExpression(); - String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); - // First request label expression can be null and second request - // is not null then we have to consider it as changed. - if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) { - return true; - } - // If the label is not matching between both request when - // requestOneLabelExp is not null. - return ((null != requestOneLabelExp) && !(requestOneLabelExp - .equals(requestTwoLabelExp))); - } - - private void updateNodeLabels(ResourceRequest request) { - String resourceName = request.getResourceName(); - if (resourceName.equals(ResourceRequest.ANY)) { - ResourceRequest previousAnyRequest = - getResourceRequest(resourceName); - - // When there is change in ANY request label expression, we should - // update label for all resource requests already added of same - // priority as ANY resource request. - if ((null == previousAnyRequest) || hasRequestLabelChanged( - previousAnyRequest, request)) { - for (ResourceRequest r : resourceRequestMap.values()) { - if (!r.getResourceName().equals(ResourceRequest.ANY)) { - r.setNodeLabelExpression(request.getNodeLabelExpression()); - } - } - } - } else{ - ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY); - if (anyRequest != null) { - request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); - } - } - } - - @Override - public ResourceRequestUpdateResult updateResourceRequests( - Collection requests, - boolean recoverPreemptedRequestForAContainer) { - try { - this.writeLock.lock(); - - ResourceRequestUpdateResult updateResult = null; - - // Update resource requests - for (ResourceRequest request : requests) { - String resourceName = request.getResourceName(); - - // Update node labels if required - updateNodeLabels(request); - - // Increment number of containers if recovering preempted resources - ResourceRequest lastRequest = resourceRequestMap.get(resourceName); - if (recoverPreemptedRequestForAContainer && lastRequest != null) { - request.setNumContainers(lastRequest.getNumContainers() + 1); - } - - // Update asks - resourceRequestMap.put(resourceName, request); - - if (resourceName.equals(ResourceRequest.ANY)) { - String partition = request.getNodeLabelExpression() == null ? - RMNodeLabelsManager.NO_LABEL : - request.getNodeLabelExpression(); - - this.primaryRequestedPartition = partition; - - //update the applications requested labels set - appSchedulingInfo.addRequestedPartition(partition); - - updateResult = new ResourceRequestUpdateResult(lastRequest, request); - } - } - return updateResult; - } finally { - this.writeLock.unlock(); - } - } - - @Override - public Map getResourceRequests() { - return resourceRequestMap; - } - - private ResourceRequest getResourceRequest(String resourceName) { - return resourceRequestMap.get(resourceName); - } - - @Override - public PendingAsk getPendingAsk(String resourceName) { - try { - readLock.lock(); - ResourceRequest request = getResourceRequest(resourceName); - if (null == request) { - return PendingAsk.ZERO; - } else{ - return new PendingAsk(request.getCapability(), - request.getNumContainers()); - } - } finally { - readLock.unlock(); - } - - } - - @Override - public int getOutstandingAsksCount(String resourceName) { - try { - readLock.lock(); - ResourceRequest request = getResourceRequest(resourceName); - if (null == request) { - return 0; - } else{ - return request.getNumContainers(); - } - } finally { - readLock.unlock(); - } - - } - - private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey, - ResourceRequest offSwitchRequest) { - int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; - offSwitchRequest.setNumContainers(numOffSwitchContainers); - - // Do we have any outstanding requests? - // If there is nothing, we need to deactivate this application - if (numOffSwitchContainers == 0) { - appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey); - appSchedulingInfo.checkForDeactivation(); - resourceRequestMap.remove(ResourceRequest.ANY); - if (resourceRequestMap.isEmpty()) { - appSchedulingInfo.removePlacementSets(schedulerRequestKey); - } - } - - appSchedulingInfo.decPendingResource( - offSwitchRequest.getNodeLabelExpression(), - offSwitchRequest.getCapability()); - } - - public ResourceRequest cloneResourceRequest(ResourceRequest request) { - ResourceRequest newRequest = ResourceRequest.newBuilder() - .priority(request.getPriority()) - .allocationRequestId(request.getAllocationRequestId()) - .resourceName(request.getResourceName()) - .capability(request.getCapability()) - .numContainers(1) - .relaxLocality(request.getRelaxLocality()) - .nodeLabelExpression(request.getNodeLabelExpression()).build(); - return newRequest; - } - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateRackLocal(SchedulerRequestKey schedulerKey, - SchedulerNode node, ResourceRequest rackLocalRequest, - List resourceRequests) { - // Update future requirements - decResourceRequest(node.getRackName(), rackLocalRequest); - - ResourceRequest offRackRequest = resourceRequestMap.get( - ResourceRequest.ANY); - decrementOutstanding(schedulerKey, offRackRequest); - - // Update cloned RackLocal and OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(rackLocalRequest)); - resourceRequests.add(cloneResourceRequest(offRackRequest)); - } - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateOffSwitch(SchedulerRequestKey schedulerKey, - ResourceRequest offSwitchRequest, - List resourceRequests) { - // Update future requirements - decrementOutstanding(schedulerKey, offSwitchRequest); - // Update cloned OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(offSwitchRequest)); - } - - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateNodeLocal(SchedulerRequestKey schedulerKey, - SchedulerNode node, ResourceRequest nodeLocalRequest, - List resourceRequests) { - // Update future requirements - decResourceRequest(node.getNodeName(), nodeLocalRequest); - - ResourceRequest rackLocalRequest = resourceRequestMap.get( - node.getRackName()); - decResourceRequest(node.getRackName(), rackLocalRequest); - - ResourceRequest offRackRequest = resourceRequestMap.get( - ResourceRequest.ANY); - decrementOutstanding(schedulerKey, offRackRequest); - - // Update cloned NodeLocal, RackLocal and OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); - resourceRequests.add(cloneResourceRequest(rackLocalRequest)); - resourceRequests.add(cloneResourceRequest(offRackRequest)); - } - - private void decResourceRequest(String resourceName, - ResourceRequest request) { - request.setNumContainers(request.getNumContainers() - 1); - if (request.getNumContainers() == 0) { - resourceRequestMap.remove(resourceName); - } - } - - @Override - public boolean canAllocate(NodeType type, SchedulerNode node) { - try { - readLock.lock(); - ResourceRequest r = resourceRequestMap.get( - ResourceRequest.ANY); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { - r = resourceRequestMap.get(node.getRackName()); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - if (type == NodeType.NODE_LOCAL) { - r = resourceRequestMap.get(node.getNodeName()); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - } - } - - return true; - } finally { - readLock.unlock(); - } - } - - @Override - public boolean canDelayTo(String resourceName) { - try { - readLock.lock(); - ResourceRequest request = getResourceRequest(resourceName); - return request == null || request.getRelaxLocality(); - } finally { - readLock.unlock(); - } - - } - - @Override - public boolean acceptNodePartition(String nodePartition, - SchedulingMode schedulingMode) { - // We will only look at node label = nodeLabelToLookAt according to - // schedulingMode and partition of node. - String nodePartitionToLookAt; - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - nodePartitionToLookAt = nodePartition; - } else { - nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; - } - - return primaryRequestedPartition.equals(nodePartitionToLookAt); - } - - @Override - public String getPrimaryRequestedNodePartition() { - return primaryRequestedPartition; - } - - @Override - public int getUniqueLocationAsks() { - return resourceRequestMap.size(); - } - - @Override - public void showRequests() { - for (ResourceRequest request : resourceRequestMap.values()) { - if (request.getNumContainers() > 0) { - LOG.debug("\tRequest=" + request); - } - } - } - - @Override - public List allocate(SchedulerRequestKey schedulerKey, - NodeType type, SchedulerNode node) { - try { - writeLock.lock(); - - List resourceRequests = new ArrayList<>(); - - ResourceRequest request; - if (type == NodeType.NODE_LOCAL) { - request = resourceRequestMap.get(node.getNodeName()); - } else if (type == NodeType.RACK_LOCAL) { - request = resourceRequestMap.get(node.getRackName()); - } else{ - request = resourceRequestMap.get(ResourceRequest.ANY); - } - - if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(schedulerKey, node, request, resourceRequests); - } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(schedulerKey, node, request, resourceRequests); - } else{ - allocateOffSwitch(schedulerKey, request, resourceRequests); - } - - return resourceRequests; - } finally { - writeLock.unlock(); - } - } - - @Override - public Iterator getAcceptedResouceNames() { - try { - readLock.lock(); - return resourceRequestMap.keySet().iterator(); - } finally { - readLock.unlock(); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java deleted file mode 100644 index 2e6c3ca..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; - -import java.util.Iterator; -import java.util.Map; - -/** - *

- * PlacementSet is the central place that decide the order of node to fit - * asks by application. - *

- * - *

- * Also, PlacementSet can cache results (for example, ordered list) for - * better performance. - *

- * - *

- * PlacementSet can depend on one or more other PlacementSets. - *

- */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public interface PlacementSet { - /** - * Get all nodes for this PlacementSet - * @return all nodes for this PlacementSet - */ - Map getAllNodes(); - - /** - * Version of the PlacementSet, can help other PlacementSet with dependencies - * deciding if update is required - * @return version - */ - long getVersion(); - - /** - * Partition of the PlacementSet. - * @return node partition - */ - String getPartition(); -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java deleted file mode 100644 index 405122b..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; - -public class PlacementSetUtils { - /* - * If the {@link PlacementSet} only has one entry, return it. otherwise - * return null - */ - public static N getSingleNode(PlacementSet ps) { - N node = null; - if (1 == ps.getAllNodes().size()) { - node = ps.getAllNodes().values().iterator().next(); - } - - return node; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java deleted file mode 100644 index 3e0620e..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - *

- * Comparing to {@link PlacementSet}, this also maintains - * pending ResourceRequests: - * - When new ResourceRequest(s) added to scheduler, or, - * - Or new container allocated, scheduler can notify corresponding - * PlacementSet. - *

- * - *

- * Different set of resource requests (E.g., resource requests with the - * same schedulerKey) can have one instance of PlacementSet, each PlacementSet - * can have different ways to order nodes depends on requests. - *

- */ -public interface SchedulingPlacementSet { - /** - * Get iterator of preferred node depends on requirement and/or availability - * @param clusterPlacementSet input cluster PlacementSet - * @return iterator of preferred node - */ - Iterator getPreferredNodeIterator(PlacementSet clusterPlacementSet); - - /** - * Replace existing ResourceRequest by the new requests - * - * @param requests new ResourceRequests - * @param recoverPreemptedRequestForAContainer if we're recovering resource - * requests for preempted container - * @return true if total pending resource changed - */ - ResourceRequestUpdateResult updateResourceRequests( - Collection requests, - boolean recoverPreemptedRequestForAContainer); - - /** - * Get pending ResourceRequests by given schedulerRequestKey - * @return Map of resourceName to ResourceRequest - */ - Map getResourceRequests(); - - /** - * Get pending ask for given resourceName. If there's no such pendingAsk, - * returns {@link PendingAsk#ZERO} - * - * @param resourceName resourceName - * @return PendingAsk - */ - PendingAsk getPendingAsk(String resourceName); - - /** - * Get #pending-allocations for given resourceName. If there's no such - * pendingAsk, returns 0 - * - * @param resourceName resourceName - * @return #pending-allocations - */ - int getOutstandingAsksCount(String resourceName); - - /** - * Notify container allocated. - * @param schedulerKey SchedulerRequestKey for this ResourceRequest - * @param type Type of the allocation - * @param node Which node this container allocated on - * @return list of ResourceRequests deducted - */ - List allocate(SchedulerRequestKey schedulerKey, - NodeType type, SchedulerNode node); - - /** - * Returns list of accepted resourceNames. - * @return Iterator of accepted resourceNames - */ - Iterator getAcceptedResouceNames(); - - /** - * We can still have pending requirement for a given NodeType and node - * @param type Locality Type - * @param node which node we will allocate on - * @return true if we has pending requirement - */ - boolean canAllocate(NodeType type, SchedulerNode node); - - /** - * Can delay to give locality? - * TODO (wangda): This should be moved out of SchedulingPlacementSet - * and should belong to specific delay scheduling policy impl. - * - * @param resourceName resourceName - * @return can/cannot - */ - boolean canDelayTo(String resourceName); - - /** - * Does this {@link SchedulingPlacementSet} accept resources on nodePartition? - * - * @param nodePartition nodePartition - * @param schedulingMode schedulingMode - * @return accepted/not - */ - boolean acceptNodePartition(String nodePartition, - SchedulingMode schedulingMode); - - /** - * It is possible that one request can accept multiple node partition, - * So this method returns primary node partition for pending resource / - * headroom calculation. - * - * @return primary requested node partition - */ - String getPrimaryRequestedNodePartition(); - - /** - * @return number of unique location asks with #pending greater than 0, - * (like /rack1, host1, etc.). - * - * TODO (wangda): This should be moved out of SchedulingPlacementSet - * and should belong to specific delay scheduling policy impl. - */ - int getUniqueLocationAsks(); - - /** - * Print human-readable requests to LOG debug. - */ - void showRequests(); -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java new file mode 100644 index 0000000..31a2170 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collections; +import java.util.Map; + +/** + * A simple CandidateNodeSet which keeps an unordered map + */ +public class SimpleCandidateNodeSet + implements CandidateNodeSet { + + private Map map; + private String partition; + + public SimpleCandidateNodeSet(N node) { + if (null != node) { + // Only one node in the initial CandidateNodeSet + this.map = ImmutableMap.of(node.getNodeID(), node); + this.partition = node.getPartition(); + } else { + this.map = Collections.emptyMap(); + this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION; + } + } + + public SimpleCandidateNodeSet(Map map, String partition) { + this.map = map; + this.partition = partition; + } + + @Override + public Map getAllNodes() { + return map; + } + + @Override + public long getVersion() { + return 0L; + } + + @Override + public String getPartition() { + return partition; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java deleted file mode 100644 index 48efaa1..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; - -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; - -/** - * A simple PlacementSet which keeps an unordered map - */ -public class SimplePlacementSet - implements PlacementSet { - - private Map map; - private String partition; - - public SimplePlacementSet(N node) { - if (null != node) { - // Only one node in the initial PlacementSet - this.map = ImmutableMap.of(node.getNodeID(), node); - this.partition = node.getPartition(); - } else { - this.map = Collections.emptyMap(); - this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION; - } - } - - public SimplePlacementSet(Map map, String partition) { - this.map = map; - this.partition = partition; - } - - @Override - public Map getAllNodes() { - return map; - } - - @Override - public long getVersion() { - return 0L; - } - - @Override - public String getPartition() { - return partition; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java new file mode 100644 index 0000000..e8268f8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement + * contains classes related to application monitor. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1dea4ee..800d023 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -140,7 +140,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -4199,7 +4199,8 @@ public void testSchedulingOnRemovedNode() throws Exception { scheduler.handle(new NodeRemovedSchedulerEvent( rm.getRMContext().getRMNodes().get(nm2.getNodeId()))); // schedulerNode is removed, try allocate a container - scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true); + scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node), + true); AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index e34665d..0fcc86d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -147,9 +147,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue) - .assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); // Mock the node's resource availability Resource available = node.getUnallocatedResource(); @@ -159,9 +159,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } - }). - when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + }).when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); doNothing().when(node).releaseContainer(any(ContainerId.class), anyBoolean()); } @@ -425,10 +425,10 @@ public void testSortedQueues() throws Exception { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(PlacementSet.class), any(ResourceLimits.class), + any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), any(ResourceLimits.class), + any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 541539d..eacbf6e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -60,7 +60,7 @@ .FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; @@ -88,13 +88,14 @@ @Override public CSAssignment allocateContainersToNode( - PlacementSet ps, boolean withNodeHeartbeat) { + CandidateNodeSet candidates, + boolean withNodeHeartbeat) { try { Thread.sleep(1000); } catch(InterruptedException e) { LOG.debug("Thread interrupted."); } - return super.allocateContainersToNode(ps, withNodeHeartbeat); + return super.allocateContainersToNode(candidates, withNodeHeartbeat); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef33..71fddfc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -656,7 +656,7 @@ private void checkNodePartitionOfRequestedPriority(AppSchedulingInfo info, if (key.getPriority().getPriority() == priority) { Assert.assertEquals("Expected partition is " + expectedPartition, expectedPartition, - info.getSchedulingPlacementSet(key) + info.getAppPlacementAllocator(key) .getPrimaryRequestedNodePartition()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index cdbbc51..a9196d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import java.util.HashMap; @@ -54,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -181,8 +180,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)).when(queue) - .assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + .assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); // Mock the node's resource availability Resource available = node.getUnallocatedResource(); @@ -192,8 +192,9 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { return new CSAssignment(allocatedResource, type); } - }).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + }).when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); } private float computeQueueAbsoluteUsedCapacity(CSQueue queue, @@ -274,13 +275,14 @@ public void testSingleLevelQueues() throws Exception { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), + any(CandidateNodeSet.class), anyResourceLimits(), any(SchedulingMode.class)); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -293,10 +295,12 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -307,10 +311,12 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -325,10 +331,12 @@ public void testSingleLevelQueues() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, b); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -547,22 +555,25 @@ public void testMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, c, b); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - applyAllocationToQueue(clusterResource, 1*GB, a); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 1 * GB, a); root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - applyAllocationToQueue(clusterResource, 2*GB, root); + allocationOrder.verify(c).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 2 * GB, root); root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, b); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); @@ -586,24 +597,28 @@ public void testMultiLevelQueues() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, a2, a1, b, c); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, a); root.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, b); root.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(c).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -720,12 +735,14 @@ public void testOffSwitchScheduling() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -738,9 +755,11 @@ public void testOffSwitchScheduling() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -800,10 +819,12 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(b2, b3); - allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(b3).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -815,10 +836,12 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b3, b2); - allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b3).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(b2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource);