diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 0dfdf20..c4401ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -43,6 +43,8 @@ RMContainerState getState(); Container getContainer(); + + long getAllocationTime(); Resource getReservedResource(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index de66f58..99d5644 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -133,21 +133,32 @@ private final ApplicationAttemptId appAttemptId; private final NodeId nodeId; private final Container container; + private final long allocationTime; private final EventHandler eventHandler; private final ContainerAllocationExpirer containerAllocationExpirer; private Resource reservedResource; private NodeId reservedNode; private Priority reservedPriority; - + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, EventHandler handler, ContainerAllocationExpirer containerAllocationExpirer) { + this(container, appAttemptId, nodeId, handler, containerAllocationExpirer, + System.currentTimeMillis()); + } + + public RMContainerImpl(Container container, + ApplicationAttemptId appAttemptId, NodeId nodeId, + EventHandler handler, + ContainerAllocationExpirer containerAllocationExpirer, + long allocationTime) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; this.container = container; + this.allocationTime = allocationTime; this.appAttemptId = appAttemptId; this.eventHandler = handler; this.containerAllocationExpirer = containerAllocationExpirer; @@ -171,6 +182,11 @@ public ApplicationAttemptId getApplicationAttemptId() { public Container getContainer() { return this.container; } + + @Override + public long getAllocationTime() { + return this.allocationTime; + } @Override public RMContainerState getState() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index d21a888..cc3f709 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.SortedSet; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Stable; @@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer.ResourcesToEnforce; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer.ResourcesToPreempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -206,6 +209,36 @@ public void completedContainer(Resource clusterResource, */ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException; + + /** + * Get the outstanding demand at ANY locality that has not been met yet + * @return outstanding demand + */ + public Resource updateCapacityAndDemand(long currentTimeMs); + + /** + * Return the resources to be enforced to achieve capacity allocation. + * Resources are added to the specified collection + */ + public void getResourcesToBeEnforced( + SortedSet resourcesToBeEnforced); + + /** + * Get the containers that can be preempted from over capacity queues + * Containers are added to the specified collection + */ + public void getPreemptableContainers( + List preemptableContainers); + + /** + * Time at which the queue went under capacity when there was demand for more + */ + public long getUnderCapacityStartTime(); + + /** + * Time at which the queue went under capacity when there was demand for more + */ + public void setUnderCapacityStartTime(long time); /** * Update the cluster resource for queues as we add/remove nodes diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4534b27..fbfbc34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,8 +22,12 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -50,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -76,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -138,6 +144,8 @@ public Configuration getConf() { private Map applications = new ConcurrentHashMap(); + + private boolean capacityEnforcementEnabled; private boolean initialized = false; @@ -176,6 +184,11 @@ public Resource getMaximumResourceCapability() { public synchronized int getNumClusterNodes() { return numNodeManagers; } + + @Override + public synchronized boolean getCapacityEnforcementEnabled() { + return capacityEnforcementEnabled; + } @Override public RMContext getRMContext() { @@ -197,6 +210,20 @@ public Resource getClusterResources() { this.rmContext = rmContext; initializeQueues(this.conf); initialized = true; + + capacityEnforcementEnabled = conf.getBoolean( + CapacitySchedulerConfiguration.ENABLE_CAPACITY_ENFORCEMENT, + CapacitySchedulerConfiguration.DEFAULT_CAPACITY_ENFORCEMENT_ENABLED); + if(capacityEnforcementEnabled) { + CapacityEnforcer preemptionService = new CapacityEnforcer(); + preemptionService.init(this.conf, root, this); + Thread preemptionThread = + new Thread(new CapacityEnforcerThread(preemptionService)); + preemptionThread.setName("CapacitySchedulerEnforcementThread"); + preemptionThread.setDaemon(true); + preemptionThread.start(); + } + } else { CapacitySchedulerConfiguration oldConf = this.conf; @@ -568,7 +595,7 @@ private synchronized void nodeUpdate(RMNode nm, // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations - + RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { FiCaSchedulerApp reservedApplication = @@ -787,4 +814,425 @@ public SchedulerNodeReport getNodeReport(NodeId nodeId) { return node == null ? null : new SchedulerNodeReport(node); } + public static boolean isQueueUnderCapacity(Resource min, Resource used, + Resource pending) { + Resource total = Resources.add(used, pending); + Resource desired = Resources.min(min, total); + if (Resources.lessThan(used, desired)) { + return true; + } else { + return false; + } + } + + public static Resource demandToBeEnforced(Resource min, Resource used, + Resource pending) { + Resource total = Resources.add(used, pending); + Resource desired = Resources.min(min, total); + return Resources.subtract(desired, used); + } + + // preemption related code + + public static class CapacityEnforcer { + + public static class ResourcesToEnforce { + CSQueue queue; + FiCaSchedulerApp app; + ResourceRequest request; + int numContainersToEnforce; + + ResourcesToEnforce(CSQueue queue, + FiCaSchedulerApp app, + ResourceRequest request, + int numContainersToEnforce) { + this.queue = queue; + this.app = app; + this.request = request; + this.numContainersToEnforce = numContainersToEnforce; + } + + static Priority.Comparator priorityComparator = new Priority.Comparator(); + + public static final Comparator + resourcesToEnforceComparator = + new Comparator() { + @Override + public int compare(ResourcesToEnforce r1, ResourcesToEnforce r2) { + long time1 = r1.queue.getUnderCapacityStartTime(); + long time2 = r2.queue.getUnderCapacityStartTime(); + // longer under allocation time is higher priority + if(time1 < time2) { + return -1; + } else if(time1 > time2) { + return 1; + } + int appCompare = + CapacityScheduler.applicationComparator.compare(r1.app, r2.app); + if(appCompare != 0) { + return appCompare; + } + + int priCompare = priorityComparator.compare(r1.request.getPriority(), + r2.request.getPriority()); + if(priCompare != 0) { + return priCompare; + } + + // larger requests to meet is higher priority to prevent starvation + if(r1.numContainersToEnforce > r2.numContainersToEnforce) { + return -1; + } else if(r1.numContainersToEnforce > r2.numContainersToEnforce) { + return 1; + } + + // larger resource size is higher priority to prevent starvation + Resource cap1 = r1.request.getCapability(); + Resource cap2 = r2.request.getCapability(); + if(Resources.greaterThan(cap1, cap2)) { + return -1; + } else if (Resources.lessThan(cap1, cap2)) { + return 1; + } else { + return 0; + } + } + }; + + } + + public static final Comparator rmContainerPreemptionComparator = + new Comparator() { + @Override + public int compare(RMContainer c1, RMContainer c2) { + // newer containers preferred for preemption to prevent work loss + if(c1.getAllocationTime() < c2.getAllocationTime()) { + return -1; + } + else if(c1.getAllocationTime() > c2.getAllocationTime()) { + return 1; + } + // smaller resource preferred for preemption to prevent work loss + Resource cap1 = c1.getContainer().getResource(); + Resource cap2 = c2.getContainer().getResource(); + if(Resources.lessThan(cap1, cap2)) { + return -1; + } else if (Resources.greaterThan(cap1, cap2)) { + return 1; + } else { + return 0; + } + } + }; + + public static class ResourcesToPreempt { + CSQueue queue; + RMContainer rmContainer; + + public ResourcesToPreempt(CSQueue queue, RMContainer container) { + this.queue = queue; + this.rmContainer = container; + } + + public static final Comparator + resourcesToPreemptComparator = new Comparator() { + @Override + public int compare(ResourcesToPreempt r1, ResourcesToPreempt r2) { + // prefer newer and then smaller containers to reduce loss + return rmContainerPreemptionComparator.compare(r1.rmContainer, + r2.rmContainer); + } + }; + } + + public static class CapacityByNode { + Resource potentialCapacity; + Resource freeCapacity; + FiCaSchedulerNode node; + TreeSet preemptableCapacity = + new TreeSet( + ResourcesToPreempt.resourcesToPreemptComparator); + CapacityScheduler scheduler; + + CapacityByNode(FiCaSchedulerNode node, CapacityScheduler scheduler) { + this.node = node; + this.scheduler = scheduler; + this.freeCapacity = Resources.clone(node.getAvailableResource()); + } + + Resource getPotentialCapacity() { + return potentialCapacity; + } + + boolean allocateAndUpdate(ResourcesToEnforce enforce) { + if(Resources.lessThan(getPotentialCapacity(), + enforce.request.getCapability())) { + return false; + } + + Resource neededResource = Resources.clone(enforce.request.getCapability()); + if(Resources.greaterThanOrEqual(freeCapacity, + neededResource)) { + // allocate from free space that will update available node resource + Resources.subtractFrom(freeCapacity, neededResource); + } else { + // reduce by free resources + Resources.subtractFrom(neededResource, freeCapacity); + freeCapacity = Resources.none(); + // preempt sufficient containers in priority order + Iterator iter = preemptableCapacity.iterator(); + while(iter.hasNext()) { + ResourcesToPreempt preempt = iter.next(); + iter.remove(); + // kill this container + Container container = preempt.rmContainer.getContainer(); + // TODO: Not sure if this ever actually adds this to the list of cleanup + // containers on the RMNode (see SchedulerNode.releaseContainer()). + scheduler.completedContainer(preempt.rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + container.getId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); + if(Resources.lessThanOrEqual(Resources.subtractFrom(neededResource, + container.getResource()), + Resources.none())) { + // preempted enough containers to meet need + break; + } + } + assert Resources.lessThanOrEqual(neededResource, Resources.none()); + } + + node.addResourcesToEnforce(new ResourcesToEnforce(enforce.queue, + enforce.app, + enforce.request, + 1)); + + updatePotentialCapacity(); + return true; + } + + // always call this to pre-calculate the value + void updatePotentialCapacity() { + potentialCapacity = Resources.clone(freeCapacity); + for(ResourcesToPreempt r : preemptableCapacity) { + Resources.addTo(potentialCapacity, + r.rmContainer.getContainer().getResource()); + } + } + + public static final Comparator + capacityByNodeComparator = new Comparator() { + @Override + public int compare(CapacityByNode n1, CapacityByNode n2) { + // higher potential capacity to preempt for better matching + Resource cap1 = n1.getPotentialCapacity(); + Resource cap2 = n2.getPotentialCapacity(); + if(Resources.greaterThan(cap1, cap2)) { + return -1; + } else if (Resources.lessThan(cap1, cap2)) { + return 1; + } else { + return 0; + } + } + }; + } + + CapacityScheduler scheduler; + protected CSQueue rootQueue; + long capacityCheckIntervalMs; + long underCapacityWaitTimeMs; + PriorityQueue nodes; + + void init(CapacitySchedulerConfiguration conf, + CSQueue rootQueue, + CapacityScheduler scheduler) { + this.scheduler = scheduler; + this.rootQueue = rootQueue; + capacityCheckIntervalMs = conf.getLong( + CapacitySchedulerConfiguration. + CAPACITY_ENFORCEMENT_CHECK_INTERVAL_MS, + CapacitySchedulerConfiguration. + DEFAULT_CAPACITY_ENFORCEMENT_CHECK_INTERVAL_MS); + underCapacityWaitTimeMs = conf.getLong( + CapacitySchedulerConfiguration.UNDERCAPACITY_WAIT_TIME_INTERVAL_MS, + CapacitySchedulerConfiguration. + DEFAULT_UNDERCAPACITY_WAIT_TIME_INTERVAL_MS); + } + + long getCapacityCheckInterval() { + return capacityCheckIntervalMs; + } + + void updateQueueCapacityAndDemand() { + for(CSQueue queue : rootQueue.getChildQueues()) { + queue.updateCapacityAndDemand(System.currentTimeMillis()); + } + } + + SortedSet getResourcesToBeEnforced() { + TreeSet resourcesToBeEnforced = + new TreeSet( + ResourcesToEnforce.resourcesToEnforceComparator); + for(CSQueue queue : rootQueue.getChildQueues()) { + queue.getResourcesToBeEnforced(resourcesToBeEnforced); + } + return resourcesToBeEnforced; + } + + List getPreemptableContainers() { + List preemptableContainers = + new ArrayList(); + for(CSQueue queue : rootQueue.getChildQueues()) { + queue.getPreemptableContainers(preemptableContainers); + } + return preemptableContainers; + } + + void createPrioritizedCapacityByNode() { + nodes = new PriorityQueue(scheduler.getNumClusterNodes(), + CapacityByNode.capacityByNodeComparator); + for(FiCaSchedulerNode node : scheduler.nodes.values()) { + // clear previous enforced resources + node.getResourcesToEnforce().clear(); + CapacityByNode cNode = new CapacityByNode(node, scheduler); + cNode.updatePotentialCapacity(); + nodes.add(cNode); + } + } + + void allocateResourcesFromPotentialCapacity( + SortedSet resourcesToBeEnforced) { + Iterator iter = resourcesToBeEnforced.iterator(); + ArrayList updatedRequests = + new ArrayList(); + FiCaSchedulerApp appToSkip = null; + while(iter.hasNext()) { + ResourcesToEnforce enforce = iter.next(); + if(appToSkip.equals(enforce.app)) { + // resources could not be enforced for this app earlier + // skip other requests that are behind it in priority + // to prevent out of order allocation + // this works because the sorted order will iterate over all requests + // from the same app together and in priority order + continue; + } + int containersEnforced = 0; + while(containersEnforced < enforce.numContainersToEnforce) { + CapacityByNode mostFreeNode = nodes.peek(); + if(mostFreeNode.allocateAndUpdate(enforce)) { + // can fit 1 container for this request on this node + containersEnforced++; + // re-add the node to maintain priority order + nodes.remove(); + nodes.add(mostFreeNode); + } else { + // could not fit a container for this request on most free node + // so further enforcement for this app is not possible + // remember this app so that subsequent requests for can be skipped + appToSkip = enforce.app; + break; + } + } + if(containersEnforced > 0) { + // some containers were enforced + enforce.numContainersToEnforce -= containersEnforced; + iter.remove(); + if(enforce.numContainersToEnforce > 0) { + // resources remaining to be allocated + // remember to add this back later in sorted order + updatedRequests.add(enforce); + } + } + } + + // add the updated requests back + resourcesToBeEnforced.addAll(updatedRequests); + } + + void allocateResourcesByPreemption( + SortedSet resourcesToBeEnforced, + List preemptableContainers) { + HashMap nodeMap = + new HashMap(); + + // clear the node priority queue + Iterator iter = nodes.iterator(); + while(iter.hasNext()) { + CapacityByNode node = iter.next(); + iter.remove(); + nodeMap.put(node.node.getNodeID(), node); + } + + // add all preemptable resources to their nodes + for(ResourcesToPreempt preempt : preemptableContainers) { + CapacityByNode node = + nodeMap.get(preempt.rmContainer.getContainer().getNodeId()); + if(node != null) { + node.preemptableCapacity.add(preempt); + } + } + + // re-create node priority queue + for(CapacityByNode node : nodeMap.values()) { + node.updatePotentialCapacity(); + nodes.add(node); + } + + allocateResourcesFromPotentialCapacity(resourcesToBeEnforced); + } + + void enforceCapacity() { + synchronized (scheduler) { + LOG.info("Running preemption check"); + // update demand and capacity times + updateQueueCapacityAndDemand(); + + // collect containers of given capability needed to enforce capacity + SortedSet resourcesToBeEnforced = + getResourcesToBeEnforced(); + if(resourcesToBeEnforced.isEmpty()) { + return; + } + + createPrioritizedCapacityByNode(); + + // try to enforce capacity by scheduling to free nodes + allocateResourcesFromPotentialCapacity(resourcesToBeEnforced); + + if(resourcesToBeEnforced.isEmpty()) { + return; + } + + // try to enforce capacity by preemting running containers + List preemptableContainers = + getPreemptableContainers(); + + allocateResourcesByPreemption(resourcesToBeEnforced, + preemptableContainers); + } + } + } + + /** + * A runnable which runs pre-emption logic + */ + private class CapacityEnforcerThread implements Runnable { + CapacityEnforcer service; + + CapacityEnforcerThread(CapacityEnforcer service) { + this.service = service; + } + public void run() { + while (true) { + try { + service.enforceCapacity(); + Thread.sleep(service.getCapacityCheckInterval()); + } catch (Exception e) { + LOG.error("Exception in capacity scheduler CapacityEnforcerThread", e); + } + } + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index ef5f171..457642b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -121,6 +121,27 @@ @Private public static final int DEFAULT_NODE_LOCALITY_DELAY = -1; + + @Private + public static final String ENABLE_CAPACITY_ENFORCEMENT = + PREFIX + "enable-capacity-enforcement"; + + @Private + public static boolean DEFAULT_CAPACITY_ENFORCEMENT_ENABLED = false; + + @Private + public static final String CAPACITY_ENFORCEMENT_CHECK_INTERVAL_MS = + PREFIX + "capacity-enforcement-check-interval-ms"; + @Private + public static final long DEFAULT_CAPACITY_ENFORCEMENT_CHECK_INTERVAL_MS = + 10 * YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS; + + @Private + public static final String UNDERCAPACITY_WAIT_TIME_INTERVAL_MS = + PREFIX + "undercapacity-wait-time-interval-ms"; + @Private + public static final long DEFAULT_UNDERCAPACITY_WAIT_TIME_INTERVAL_MS = + 1000 * 60 * 5; public CapacitySchedulerConfiguration() { this(new Configuration()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 861d630..172496f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -36,6 +36,8 @@ RMContainerTokenSecretManager getContainerTokenSecretManager(); int getNumClusterNodes(); + + boolean getCapacityEnforcementEnabled() ; RMContext getRMContext(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 28a81f7..d061e5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -27,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.Stack; import java.util.TreeSet; import org.apache.commons.logging.Log; @@ -62,6 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer.ResourcesToEnforce; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer.ResourcesToPreempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -116,6 +122,9 @@ private Map acls = new HashMap(); + + long underCapacityStartTime = 0; + Resource demandToBeEnforced = null; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -765,6 +774,27 @@ private synchronized FiCaSchedulerApp getApplication( + " #applications=" + activeApplications.size()); } + // Check for enforced resources + if(!node.getResourcesToEnforce().isEmpty()) { + // enforced resources exist + if(!this.equals(node.getResourcesToEnforce().get(0).app)) { + // this is not the app for the next enforce request + return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); + } + + Resource assigned = assignEnforcedResources(clusterResource, node); + if(Resources.greaterThan(assigned, Resources.none())){ + // assignment was made + return new CSAssignment(assigned, NodeType.OFF_SWITCH); + } + // no assignment was made + if(!node.getResourcesToEnforce().isEmpty()) { + // enforced resources still left + return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); + } + // no enforced resources remain. do normal assignment. + } + // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -859,6 +889,74 @@ private synchronized FiCaSchedulerApp getApplication( return NULL_ASSIGNMENT; } + + private synchronized Resource assignEnforcedResources( + Resource clusterResource, FiCaSchedulerNode node) { + List resourcesToEnforce = node.getResourcesToEnforce(); + if(resourcesToEnforce.isEmpty()) { + return Resources.none(); + } + + while(!resourcesToEnforce.isEmpty()) { + ResourcesToEnforce enforce = resourcesToEnforce.get(0); + FiCaSchedulerApp app = enforce.app; + if(!app.equals(this)) { + // this app is not the one being enforced + return Resources.none(); + } + ResourceRequest request = enforce.request; + Priority priority = request.getPriority(); + Resource capability = request.getCapability(); + ResourceRequest currRequest = app.getResourceRequest(priority, RMNode.ANY); + + // check if enforced request is still valid + if(currRequest.getNumContainers() == 0 || + !currRequest.getCapability().equals(capability)) { + // resources not needed at this priority or have changed capability + resourcesToEnforce.remove(0); + // try to assign next enforced resource + continue; + } + + if(currRequest.getNumContainers() < enforce.numContainersToEnforce) { + enforce.numContainersToEnforce = currRequest.getNumContainers(); + } + + if(Resources.lessThan(node.getAvailableResource(), capability)) { + // not enough resources to meet enforcement + // dont assign out of order + return Resources.none(); + } + + Resource assigned = assignContainer(clusterResource, + node, + app, + priority, + // send the actual current request + // because its going to be modified + // in this call + currRequest, + NodeType.OFF_SWITCH, + null); + if(Resources.lessThanOrEqual(assigned, Resources.none())) { + // failed to assign for some reason even when resources are available + // return and dont assign out of order + return Resources.none(); + } + + // successful assignment + enforce.numContainersToEnforce--; + if(enforce.numContainersToEnforce == 0) { + // remove this from the head of the list + resourcesToEnforce.remove(0); + } + + // following current convention return after 1 assignment + return assigned; + } + + return Resources.none(); + } private synchronized Resource assignReservedContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) { @@ -1288,6 +1386,11 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod return container.getResource(); } else { + if (scheduler.getCapacityEnforcementEnabled()) { + // Disable reservation when capacity enforcement is on + return Resources.none(); + } + // Reserve by 'charging' in advance... reserve(application, priority, node, rmContainer, container); @@ -1447,6 +1550,168 @@ public synchronized void updateClusterResource(Resource clusterResource) { } @Override + public synchronized Resource updateCapacityAndDemand(long currentTimeMs) { + Resource unAllocatedDemand = Resources.createResource(0); + Resource resourceCapacity = Resources.multiply( + scheduler.getClusterResources(), + absoluteCapacity); + + // get the unallocated demand for all active applications + for(FiCaSchedulerApp app : activeApplications) { + synchronized (app) { + for(Priority priority : app.getPriorities()) { + // get the overall demand for resources that has not been met yet + ResourceRequest required = app.getResourceRequest(priority, RMNode.ANY); + Resource appUnAllocatedDemand = Resources.multiply( + required.getCapability(), required.getNumContainers()); + Resources.add(unAllocatedDemand, appUnAllocatedDemand); + } + } + } + + if (CapacityScheduler.isQueueUnderCapacity(resourceCapacity, usedResources, + unAllocatedDemand)) { + // queue is under capacity + if (getUnderCapacityStartTime() == 0) { + setUnderCapacityStartTime(currentTimeMs); + } + // update resources to be enforced + demandToBeEnforced = CapacityScheduler.demandToBeEnforced( + resourceCapacity, + usedResources, + unAllocatedDemand); + } else { + // queue is at or over capacity + setUnderCapacityStartTime(0); + demandToBeEnforced = null; + } + + return unAllocatedDemand; + } + + @Override + public synchronized void getResourcesToBeEnforced( + SortedSet resourcesToBeEnforced) { + if(getUnderCapacityStartTime() == 0) { + // not under capacity + return; + } + + assert demandToBeEnforced != null; + + Resource resourceToBeEnforced = Resources.clone(demandToBeEnforced); + + for(FiCaSchedulerApp app : activeApplications) { + if(Resources.lessThanOrEqual(resourceToBeEnforced, Resources.none())) { + break; + } + boolean appRequirementFullyMet = true; + synchronized (app) { + for(Priority priority : app.getPriorities()) { + if(Resources.lessThanOrEqual(resourceToBeEnforced, Resources.none())) { + break; + } + // get the overall demand for resources that has not been met yet + ResourceRequest request = app + .getResourceRequest(priority, RMNode.ANY); + int numContainers = 0; + // this could end up asking for more resources then strictly needed + // to meet allocation. Its ok since the app has been under allocated + // for some time + while (numContainers < request.getNumContainers() || + Resources.lessThan(resourceToBeEnforced, Resources.none())) { + Resources.subtractFrom(resourceToBeEnforced, + request.getCapability()); + ++numContainers; + } + if(numContainers > 0) { + ResourcesToEnforce enforcedResource = + new ResourcesToEnforce(this, app, request, numContainers); + resourcesToBeEnforced.add(enforcedResource); + } + if(numContainers < request.getNumContainers()) { + // done scheduling from this app + appRequirementFullyMet = false; + break; + } + } + if(!appRequirementFullyMet) { + // if all requests for higher priority application have not been met + // then dont go further + break; + } + } + } + } + + @Override + public synchronized void getPreemptableContainers( + List preemptableContainers) { + if(getUnderCapacityStartTime() != 0) { + // not over capacity + return; + } + + Resource resourceCapacity = Resources.multiply( + scheduler.getClusterResources(), + absoluteCapacity); + Resource preemptableResources = Resources.subtract(usedResources, + resourceCapacity); + if(Resources.lessThan(preemptableResources, Resources.none())) { + // not over capacity + return; + } + + Stack appsReversePriority = new Stack(); + // get the applications in reverse order of priority + for(FiCaSchedulerApp app : activeApplications) { + appsReversePriority.push(app); + } + + while(!appsReversePriority.isEmpty()) { + if(Resources.lessThan(preemptableResources, + scheduler.getMinimumResourceCapability())) { + break; + } + FiCaSchedulerApp app = appsReversePriority.pop(); + synchronized (app) { + // allocated container priority information is not preserved + // sort by allocation time to reduce wasted work + Collection allocatedContainers = + app.getLiveContainersMap().values(); + TreeSet sortedContainers = new TreeSet( + CapacityEnforcer.rmContainerPreemptionComparator); + sortedContainers.addAll(allocatedContainers); + for(RMContainer rmContainer : sortedContainers) { + // its possible that no containers will be added if they are all + // greater than size of resources to preempt. Better to let this + // happen than preempt such a container and make this queue go under + // capacity + Resource containerResource = rmContainer.getContainer().getResource(); + if (Resources.lessThan(preemptableResources, + containerResource)) { + continue; + } + Resources.subtractFrom(preemptableResources, containerResource); + ResourcesToPreempt preemptResource = + new ResourcesToPreempt(this, rmContainer); + preemptableContainers.add(preemptResource); + } + } + } + } + + @Override + public long getUnderCapacityStartTime() { + return underCapacityStartTime; + } + + @Override + public void setUnderCapacityStartTime(long time) { + underCapacityStartTime = time; + } + + @Override public QueueMetrics getMetrics() { return metrics; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index f5f4498..9caf345 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; import org.apache.commons.logging.Log; @@ -51,6 +52,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer.ResourcesToEnforce; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer.ResourcesToPreempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -60,6 +63,7 @@ private static final Log LOG = LogFactory.getLog(ParentQueue.class); + CapacitySchedulerContext scheduler; private CSQueue parent; private final String queueName; @@ -92,6 +96,8 @@ private Map acls = new HashMap(); + + long underCapacityStartTime = 0; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -99,6 +105,7 @@ public ParentQueue(CapacitySchedulerContext cs, String queueName, Comparator comparator, CSQueue parent, CSQueue old) { + this.scheduler = cs; minimumAllocation = cs.getMinimumResourceCapability(); this.parent = parent; @@ -730,6 +737,59 @@ public synchronized void updateClusterResource(Resource clusterResource) { } @Override + public synchronized Resource updateCapacityAndDemand(long currentTimeMs) { + Resource unAllocatedDemand = Resources.createResource(0); + Resource resourceCapacity = Resources.multiply( + scheduler.getClusterResources(), + absoluteCapacity); + + for(CSQueue queue : getChildQueues()) { + Resources.add(unAllocatedDemand, + queue.updateCapacityAndDemand(currentTimeMs)); + } + + if(CapacityScheduler.isQueueUnderCapacity(resourceCapacity, + usedResources, + unAllocatedDemand)) { + // queue is under capacity + if(getUnderCapacityStartTime() == 0) { + setUnderCapacityStartTime(currentTimeMs); + } + } else { + // queue is at or over capacity + setUnderCapacityStartTime(0); + } + + return unAllocatedDemand; + } + + @Override + public void getResourcesToBeEnforced( + SortedSet resourcesToBeEnforced) { + for(CSQueue queue : getChildQueues()) { + queue.getResourcesToBeEnforced(resourcesToBeEnforced); + } + } + + @Override + public void getPreemptableContainers( + List preemptableContainers) { + for(CSQueue queue : getChildQueues()) { + queue.getPreemptableContainers(preemptableContainers); + } + } + + @Override + public long getUnderCapacityStartTime() { + return underCapacityStartTime; + } + + @Override + public void setUnderCapacityStartTime(long time) { + underCapacityStartTime = time; + } + + @Override public QueueMetrics getMetrics() { return metrics; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index e53ca82..5bc0239 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -182,6 +182,10 @@ public String getQueueName() { public synchronized Collection getLiveContainers() { return new ArrayList(liveContainers.values()); } + + public synchronized Map getLiveContainersMap() { + return liveContainers; + } public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { // Cleanup all scheduling information @@ -247,7 +251,7 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, RMContainer rmContainer = new RMContainerImpl(container, this .getApplicationAttemptId(), node.getNodeID(), this.rmContext .getDispatcher().getEventHandler(), this.rmContext - .getContainerAllocationExpirer()); + .getContainerAllocationExpirer(), System.currentTimeMillis()); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index b07e9c6..f3efa85 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.CapacityEnforcer.ResourcesToEnforce; public class FiCaSchedulerNode extends SchedulerNode { @@ -58,6 +59,12 @@ private final Map launchedContainers = new HashMap(); + /* + * Ordered list of resources that have been assigned to this node + */ + private final List resourcesToEnforce = + new ArrayList(); + private final RMNode rmNode; public static final String ANY = "*"; @@ -110,6 +117,14 @@ public synchronized void allocateContainer(ApplicationId applicationId, getUsedResource() + " used and " + getAvailableResource() + " available"); } + + public void addResourcesToEnforce(ResourcesToEnforce enforce) { + resourcesToEnforce.add(enforce); + } + + public List getResourcesToEnforce() { + return resourcesToEnforce; + } @Override public synchronized Resource getAvailableResource() {