commit d70857263b1225415c9cb1788c3aae6908296ab3 Author: Wangda Tan Date: Tue May 24 00:19:55 2016 -0700 global-scheduling diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index f1b4f07..01cad88 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -51,7 +51,7 @@ // {"default-rack", "hostFoo"} or "coreSwitchA/TORSwitchB", "hostBar" public static String[] getRackHostName(String hostname) { NodeBase node = new NodeBase(hostname); - return new String[] {node.getNetworkLocation().substring(1), + return new String[] {"/" + node.getNetworkLocation().substring(1), node.getName()}; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 64eb777..6380b87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -28,6 +28,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -133,7 +134,6 @@ public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); } - @VisibleForTesting public ClusterNodeTracker getNodeTracker() { return nodeTracker; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index e487f69..accb91d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -48,8 +48,8 @@ private static final Log LOG = LogFactory.getLog(ClusterNodeTracker.class); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); - private Lock readLock = readWriteLock.readLock(); - private Lock writeLock = readWriteLock.writeLock(); + private volatile Lock readLock = readWriteLock.readLock(); + private volatile Lock writeLock = readWriteLock.writeLock(); private HashMap nodes = new HashMap<>(); private Map nodeNameToNodeMap = new HashMap<>(); @@ -65,9 +65,26 @@ private boolean forceConfiguredMaxAllocation = true; private long configuredMaxAllocationWaitTime; + // version of node list, it will be increased when adding / removing of nodes + // happens. Initially, it starts from zero, and it will be reset to 0 when go + // beyond max_long + private volatile long nodeListVersion = 0; + + private void updateNodeListVersion() { + if (nodeListVersion == Long.MAX_VALUE) { + nodeListVersion = 0; + } + } + + public long getNodeListVersion() { + return nodeListVersion; + } + public void addNode(N node) { writeLock.lock(); try { + updateNodeListVersion(); + nodes.put(node.getNodeID(), node); nodeNameToNodeMap.put(node.getNodeName(), node); @@ -90,6 +107,14 @@ public void addNode(N node) { } } + /* + * Sometimes readLocks can be used by external component to do fine-grained + * locking. + */ + public Lock getNodeListReadLock() { + return readLock; + } + public boolean exists(NodeId nodeId) { readLock.lock(); try { @@ -159,6 +184,8 @@ public N removeNode(NodeId nodeId) { LOG.warn("Attempting to remove a non-existent node " + nodeId); return null; } + + updateNodeListVersion(); nodeNameToNodeMap.remove(node.getNodeName()); // Update nodes per rack as well diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c4b32a8..4d07d97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -328,12 +328,11 @@ public synchronized int getReReservations(SchedulerRequestKey schedulerKey) { /** * Get total current reservations. - * Used only by unit tests * @return total current reservations */ @Stable @Private - public synchronized Resource getCurrentReservation() { + public Resource getCurrentReservation() { return attemptResourceUsage.getReserved(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 2efdbd0..69595ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -72,6 +72,9 @@ private volatile Set labels = null; + // What's the latest time that this node visited by scheduler + protected volatile long latestVisitedTimestamp = -1; + public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; @@ -432,4 +435,8 @@ public void setNodeUtilization(ResourceUtilization nodeUtilization) { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + public long getLatestVisitedTimestamp() { + return this.latestVisitedTimestamp; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9c88154..7e43275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -688,4 +692,15 @@ public Resource getTotalKillableResource(String partition) { return csContext.getPreemptionManager().getKillableContainers(queueName, partition); } + + // Only for testing + @VisibleForTesting + public synchronized CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + SchedulingMode schedulingMode) { + return assignContainers(clusterResource, + new PlacementSet(node, ImmutableMap.of(node.getNodeID(), node), + node.getPartition()), + currentResourceLimits, schedulingMode); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index daf7790..a03fccc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.security.AccessControlException; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -195,13 +197,18 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, /** * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. - * @param node node on which resources are available - * @param resourceLimits how much overall resource of this queue can use. - * @param schedulingMode Type of exclusive check when assign container on a + * @param placementSet node on which resources are available + * @param resourceLimits how much overall resource of this queue can use. + * @param schedulingMode Type of exclusive check when assign container on a * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, + PlacementSet placementSet, ResourceLimits resourceLimits, + SchedulingMode schedulingMode); + + @VisibleForTesting + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index bedf455..3545d9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,23 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -111,6 +97,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -133,8 +120,22 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; @LimitedPrivate("yarn") @Evolving @@ -145,7 +146,7 @@ private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private YarnAuthorizationProvider authorizer; - + private CSQueue root; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -167,7 +168,7 @@ public int compare(CSQueue q1, CSQueue q2) { return q1.getQueuePath().compareTo(q2.getQueuePath()); } }; - + static final PartitionedQueueComparator partitionedQueueComparator = new PartitionedQueueComparator(); @@ -175,7 +176,7 @@ public int compare(CSQueue q1, CSQueue q2) { public void setConf(Configuration conf) { yarnConf = conf; } - + private void validateConf(Configuration conf) { // validate scheduler memory allocation setting int minMem = conf.getInt( @@ -229,7 +230,11 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; - private RMNodeLabelsManager labelManager; + + private boolean scheduleGlobally; + private GlobalSchedulingThread globalSchedulingThread; + + volatile private RMNodeLabelsManager labelManager; private SchedulerHealth schedulerHealth = new SchedulerHealth(); volatile long lastNodeUpdateTime; @@ -254,14 +259,14 @@ public QueueMetrics getRootQueueMetrics() { public CSQueue getRootQueue() { return root; } - + @Override public CapacitySchedulerConfiguration getConfiguration() { return conf; } @Override - public synchronized RMContainerTokenSecretManager + public synchronized RMContainerTokenSecretManager getContainerTokenSecretManager() { return this.rmContext.getContainerTokenSecretManager(); } @@ -275,7 +280,7 @@ public ResourceCalculator getResourceCalculator() { public Comparator getNonPartitionedQueueComparator() { return nonPartitionedQueueComparator; } - + @Override public PartitionedQueueComparator getPartitionedQueueComparator() { return partitionedQueueComparator; @@ -310,6 +315,7 @@ private synchronized void initScheduler(Configuration configuration) throws initializeQueues(this.conf); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); + // Initialize async scheduling parameters scheduleAsynchronously = this.conf.getScheduleAynschronously(); asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, @@ -318,12 +324,21 @@ private synchronized void initScheduler(Configuration configuration) throws asyncSchedulerThread = new AsyncScheduleThread(this); } + // Initialize global scheduling parameters + scheduleGlobally = this.conf.getBoolean( + CapacitySchedulerConfiguration.SCHEDULE_GLOBALLY_ENABLE, false); + if (scheduleGlobally) { + globalSchedulingThread = new GlobalSchedulingThread(this); + } + + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + "asynchronousScheduling=" + scheduleAsynchronously + ", " + - "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); + "asyncScheduleInterval=" + asyncScheduleInterval + "ms, " + + "globalScheduling=" + scheduleGlobally); } private synchronized void startSchedulerThreads() { @@ -332,6 +347,12 @@ private synchronized void startSchedulerThreads() { "asyncSchedulerThread is null"); asyncSchedulerThread.start(); } + + if (scheduleGlobally) { + Preconditions.checkNotNull(globalSchedulingThread, + "globalSchedulerThread is null"); + globalSchedulingThread.start(); + } } @Override @@ -378,40 +399,17 @@ public void serviceStop() throws Exception { // update lazy preemption this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); } - + long getAsyncScheduleInterval() { return asyncScheduleInterval; } private final static Random random = new Random(System.currentTimeMillis()); - - /** - * Schedule on all nodes by starting at a random point. - * @param cs - */ - static void schedule(CapacityScheduler cs) { - // First randomize the start point - int current = 0; - Collection nodes = cs.nodeTracker.getAllNodes(); - int start = random.nextInt(nodes.size()); - for (FiCaSchedulerNode node : nodes) { - if (current++ >= start) { - cs.allocateContainersToNode(node); - } - } - // Now, just get everyone to be safe - for (FiCaSchedulerNode node : nodes) { - cs.allocateContainersToNode(node); - } - try { - Thread.sleep(cs.getAsyncScheduleInterval()); - } catch (InterruptedException e) {} - } - + static class AsyncScheduleThread extends Thread { - private final CapacityScheduler cs; - private AtomicBoolean runSchedules = new AtomicBoolean(false); + final CapacityScheduler cs; + AtomicBoolean runSchedules = new AtomicBoolean(false); public AsyncScheduleThread(CapacityScheduler cs) { this.cs = cs; @@ -426,11 +424,34 @@ public void run() { Thread.sleep(100); } catch (InterruptedException ie) {} } else { - schedule(cs); + asyncSchedule(cs); } } } + /** + * Schedule on all nodes by starting at a random point. + * @param cs + */ + static void asyncSchedule(CapacityScheduler cs) { + // First randomize the start point + int current = 0; + Collection nodes = cs.nodeTracker.getAllNodes(); + int start = random.nextInt(nodes.size()); + for (FiCaSchedulerNode node : nodes) { + if (current++ >= start) { + cs.allocateContainersToNode(node); + } + } + // Now, just get everyone to be safe + for (FiCaSchedulerNode node : nodes) { + cs.allocateContainersToNode(node); + } + try { + Thread.sleep(cs.getAsyncScheduleInterval()); + } catch (InterruptedException e) {} + } + public void beginSchedule() { runSchedules.set(true); } @@ -440,9 +461,9 @@ public void suspendSchedule() { } } - + @Private - public static final String ROOT_QUEUE = + public static final String ROOT_QUEUE = CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; static class QueueHook { @@ -487,16 +508,20 @@ public CSQueue hook(CSQueue queue) { return null; } + public boolean globalSchedulingEnabled() { + return scheduleGlobally; + } + private void updatePlacementRules() throws IOException { List placementRules = new ArrayList<>(); - + // Initialize UserGroupMappingPlacementRule // TODO, need make this defineable by configuration. UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule(); if (null != ugRule) { placementRules.add(ugRule); } - + rmContext.getQueuePlacementManager().updateRules(placementRules); } @@ -504,8 +529,8 @@ private void updatePlacementRules() throws IOException { private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { - root = - parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, + root = + parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); labelManager.reinitializeQueueLabels(getQueueToLabels()); LOG.info("Initialized root queue " + root); @@ -517,20 +542,20 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) } @Lock(CapacityScheduler.class) - private void reinitializeQueues(CapacitySchedulerConfiguration conf) + private void reinitializeQueues(CapacitySchedulerConfiguration conf) throws IOException { // Parse new queues Map newQueues = new HashMap(); - CSQueue newRoot = - parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); - + CSQueue newRoot = + parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, + newQueues, queues, noop); + // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); // Add new queues addNewQueues(queues, newQueues); - + // Re-configure queues root.reinitialize(newRoot, getClusterResource()); updatePlacementRules(); @@ -574,14 +599,14 @@ public static void setQueueAcls(YarnAuthorizationProvider authorizer, */ @Lock(CapacityScheduler.class) private void validateExistingQueues( - Map queues, Map newQueues) + Map queues, Map newQueues) throws IOException { // check that all static queues are included in the newQueues list for (Map.Entry e : queues.entrySet()) { if (!(e.getValue() instanceof ReservationQueue)) { String queueName = e.getKey(); CSQueue oldQueue = e.getValue(); - CSQueue newQueue = newQueues.get(queueName); + CSQueue newQueue = newQueues.get(queueName); if (null == newQueue) { throw new IOException(queueName + " cannot be found during refresh!"); } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { @@ -601,7 +626,7 @@ private void validateExistingQueues( */ @Lock(CapacityScheduler.class) private void addNewQueues( - Map queues, Map newQueues) + Map queues, Map newQueues) { for (Map.Entry e : newQueues.entrySet()) { String queueName = e.getKey(); @@ -611,19 +636,19 @@ private void addNewQueues( } } } - + @Lock(CapacityScheduler.class) static CSQueue parseQueue( CapacitySchedulerContext csContext, - CapacitySchedulerConfiguration conf, + CapacitySchedulerConfiguration conf, CSQueue parent, String queueName, Map queues, - Map oldQueues, + Map oldQueues, QueueHook hook) throws IOException { CSQueue queue; String fullQueueName = (parent == null) ? queueName : (parent.getQueuePath() + "." + queueName); - String[] childQueueNames = + String[] childQueueNames = conf.getQueues(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName); if (childQueueNames == null || childQueueNames.length == 0) { @@ -650,7 +675,7 @@ static CSQueue parseQueue( throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = + ParentQueue parentQueue = new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests @@ -658,8 +683,8 @@ static CSQueue parseQueue( List childQueues = new ArrayList(); for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, + CSQueue childQueue = + parseQueue(csContext, conf, queue, childQueueName, queues, oldQueues, hook); childQueues.add(childQueue); } @@ -758,7 +783,7 @@ private synchronized void addApplication(ApplicationId applicationId, return; } if (!(queue instanceof LeafQueue)) { - String message = "Application " + applicationId + + String message = "Application " + applicationId + " submitted by user " + user + " to non-leaf queue: " + queueName; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, @@ -862,7 +887,7 @@ private synchronized void doneApplicationAttempt( RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application Attempt " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - + FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); @@ -1014,8 +1039,8 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, @Override @Lock(Lock.NoLock.class) - public QueueInfo getQueueInfo(String queueName, - boolean includeChildQueues, boolean recursive) + public QueueInfo getQueueInfo(String queueName, + boolean includeChildQueues, boolean recursive) throws IOException { CSQueue queue = null; queue = this.queues.get(queueName); @@ -1048,7 +1073,7 @@ private synchronized void nodeUpdate(RMNode nm) { Resource releaseResources = Resource.newInstance(0, 0); FiCaSchedulerNode node = getNode(nm.getNodeID()); - + List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -1056,12 +1081,12 @@ private synchronized void nodeUpdate(RMNode nm) { newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); completedContainers.addAll(containerInfo.getCompletedContainers()); } - + // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); } - + // Processing the newly increased containers List newlyIncreasedContainers = nm.pullNewlyIncreasedContainers(); @@ -1118,18 +1143,18 @@ private synchronized void nodeUpdate(RMNode nm) { " availableResource: " + node.getUnallocatedResource()); } } - + /** * Process resource update on a node. */ - private synchronized void updateNodeAndQueueResource(RMNode nm, + private synchronized void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { updateNodeResource(nm, resourceOption); Resource clusterResource = getClusterResource(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); } - + /** * Process node labels update on a node. */ @@ -1139,7 +1164,7 @@ private synchronized void updateLabelsOnNode(NodeId nodeId, if (null == node) { return; } - + // Get new partition, we have only one partition per node String newPartition; if (newLabels.isEmpty()) { @@ -1166,13 +1191,13 @@ private synchronized void updateLabelsOnNode(NodeId nodeId, continue; } } - + // Unreserve container on this node RMContainer reservedContainer = node.getReservedContainer(); if (null != reservedContainer) { killReservedContainer(reservedContainer); } - + // Update node labels after we've done this node.updateLabels(newLabels); } @@ -1186,53 +1211,32 @@ private void updateSchedulerHealth(long now, FiCaSchedulerNode node, List reservations = assignment.getAssignmentInformation().getReservationDetails(); if (!allocations.isEmpty()) { - ContainerId allocatedContainerId = - allocations.get(allocations.size() - 1).containerId; + ContainerId allocatedContainerId = allocations.get(allocations.size() - 1).containerId; String allocatedQueue = allocations.get(allocations.size() - 1).queue; schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId, - allocatedQueue); + allocatedQueue); } if (!reservations.isEmpty()) { - ContainerId reservedContainerId = - reservations.get(reservations.size() - 1).containerId; + ContainerId reservedContainerId = reservations.get(reservations.size() - 1).containerId; String reservedQueue = reservations.get(reservations.size() - 1).queue; schedulerHealth.updateReservation(now, nodeId, reservedContainerId, - reservedQueue); + reservedQueue); } - schedulerHealth.updateSchedulerReservationCounts(assignment - .getAssignmentInformation().getNumReservations()); - schedulerHealth.updateSchedulerAllocationCounts(assignment - .getAssignmentInformation().getNumAllocations()); - schedulerHealth.updateSchedulerRunDetails(now, assignment - .getAssignmentInformation().getAllocated(), assignment - .getAssignmentInformation().getReserved()); - } - - @VisibleForTesting - public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } - - if (!nodeTracker.exists(node.getNodeID())) { - LOG.info("Skipping scheduling as the node " + node.getNodeID() + - " has been removed"); - return; - } - - // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, - new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); - - CSAssignment assignment; - - // Assign new containers... - // 1. Check for reserved applications - // 2. Schedule if there are no reservations + schedulerHealth.updateSchedulerReservationCounts( + assignment.getAssignmentInformation().getNumReservations()); + schedulerHealth.updateSchedulerAllocationCounts( + assignment.getAssignmentInformation().getNumAllocations()); + schedulerHealth.updateSchedulerRunDetails(now, + assignment.getAssignmentInformation().getAllocated(), + assignment.getAssignmentInformation().getReserved()); + } + protected synchronized CSAssignment allocateOnReservedNode( + FiCaSchedulerNode node) { RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { + CSAssignment assignment; + FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(reservedContainer.getContainerId()); @@ -1245,7 +1249,7 @@ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { assignment = queue.assignContainers( getClusterResource(), - node, + new PlacementSet(node, null, node.getPartition()), // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( @@ -1263,10 +1267,41 @@ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { updateSchedulerHealth(lastNodeUpdateTime, node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); } + + return assignment; + } else { + return null; + } + } + + @VisibleForTesting + public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + + if (!nodeTracker.exists(node.getNodeID())) { + LOG.info("Skipping scheduling as the node " + node.getNodeID() + + " has been removed"); + return; + } + + // reset allocation and reservation stats before we start doing any work + updateSchedulerHealth(lastNodeUpdateTime, node, + new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); + + // Assign new containers... + // 1. Check for reserved applications + // 2. Schedule if there are no reservations + if (node.getReservedContainer() != null) { + allocateOnReservedNode(node); } // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { + CSAssignment assignment; + if (calculator.computeAvailableContainers(Resources .add(node.getUnallocatedResource(), node.getTotalKillableResources()), minimumAllocation) > 0) { @@ -1278,7 +1313,8 @@ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { assignment = root.assignContainers( getClusterResource(), - node, + new PlacementSet<>(node, ImmutableMap.of(node.getNodeID(), node), + node.getPartition()), new ResourceLimits(labelManager.getResourceByLabel( node.getPartition(), getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1287,13 +1323,13 @@ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { updateSchedulerHealth(lastNodeUpdateTime, node, assignment); return; } - + // Only do non-exclusive allocation when node has node-labels. if (StringUtils.equals(node.getPartition(), RMNodeLabelsManager.NO_LABEL)) { return; } - + // Only do non-exclusive allocation when the node-label supports that try { if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( @@ -1305,11 +1341,12 @@ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + node.getPartition(), e); return; } - + // Try to use NON_EXCLUSIVE assignment = root.assignContainers( getClusterResource(), - node, + new PlacementSet<>(node, ImmutableMap.of(node.getNodeID(), node), + node.getPartition()), // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( @@ -1345,7 +1382,7 @@ public void handle(SchedulerEvent event) { break; case NODE_RESOURCE_UPDATE: { - NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = (NodeResourceUpdateSchedulerEvent)event; updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); @@ -1355,7 +1392,7 @@ public void handle(SchedulerEvent event) { { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - + for (Entry> entry : labelUpdateEvent .getUpdatedNodeToLabels().entrySet()) { NodeId id = entry.getKey(); @@ -1370,7 +1407,7 @@ public void handle(SchedulerEvent event) { RMNode node = nodeUpdatedEvent.getRMNode(); setLastNodeUpdateTime(Time.now()); nodeUpdate(node); - if (!scheduleAsynchronously) { + if (!scheduleAsynchronously && !scheduleGlobally) { allocateContainersToNode(getNode(node.getNodeID())); } } @@ -1419,7 +1456,7 @@ public void handle(SchedulerEvent event) { break; case CONTAINER_EXPIRED: { - ContainerExpiredSchedulerEvent containerExpiredEvent = + ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); if (containerExpiredEvent.isIncrease()) { @@ -1486,7 +1523,7 @@ private synchronized void addNode(RMNode nodeManager) { root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); - LOG.info("Added node " + nodeManager.getNodeAddress() + + LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); if (scheduleAsynchronously && getNumClusterNodes() == 1) { @@ -1512,18 +1549,18 @@ private synchronized void removeNode(RMNode nodeInfo) { for (RMContainer container : runningContainers) { super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), + container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } - + // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { super.completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( - reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), + reservedContainer.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } @@ -1537,7 +1574,7 @@ private synchronized void removeNode(RMNode nodeInfo) { asyncSchedulerThread.suspendSchedule(); } - LOG.info("Removed node " + nodeInfo.getNodeAddress() + + LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + getClusterResource()); } @@ -1573,7 +1610,7 @@ protected void completedContainerInternal( RMContainerEventType event) { Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); - + // Get the application for the finished container FiCaSchedulerApp application = getCurrentAttemptForContainer(container.getId()); @@ -1584,16 +1621,16 @@ protected void completedContainerInternal( + appId + " completed with event " + event); return; } - + // Get the node on which the container was allocated FiCaSchedulerNode node = getNode(container.getNodeId()); - + // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); queue.completedContainer(getClusterResource(), application, node, rmContainer, containerStatus, event, null, true); } - + @Override protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest, SchedulerApplicationAttempt attempt) { @@ -1637,7 +1674,7 @@ public FiCaSchedulerNode getNode(NodeId nodeId) { public List getAllNodes() { return nodeTracker.getAllNodes(); } - + @Override @Lock(Lock.NoLock.class) public void recover(RMState state) throws Exception { @@ -1974,7 +2011,7 @@ private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { } return EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); } - + @Override public Resource getMaximumResourceCapability(String queueName) { CSQueue queue = getQueue(queueName); @@ -2134,4 +2171,8 @@ public PreemptionManager getPreemptionManager() { public ResourceUsage getClusterResourceUsage() { return root.getQueueResourceUsage(); } + + RMNodeLabelsManager getNodeLabelsManager() { + return labelManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..f476d8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -202,6 +202,14 @@ SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable"; @Private + public static final String SCHEDULE_GLOBALLY_PREFIX = + PREFIX + "schedule-globally"; + + @Private + public static final String SCHEDULE_GLOBALLY_ENABLE = + SCHEDULE_GLOBALLY_PREFIX + ".enable"; + + @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index b39b289..bab4b2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -80,4 +80,6 @@ * cluster. */ ResourceUsage getClusterResourceUsage(); + + boolean globalSchedulingEnabled(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GlobalSchedulingThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GlobalSchedulingThread.java new file mode 100644 index 0000000..f064f37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GlobalSchedulingThread.java @@ -0,0 +1,146 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class GlobalSchedulingThread extends Thread { + private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); + + // Schedulable nodes could exclude nodes which don't have available resource, + // Or reserved nodes + private Map schedulableNodes; + private Map reservedNodes; + private Map exhaustedNodes; + + private long lastSyncNodeListVersion = -1; + private long lastRefreshNodeTS = -1; + + boolean scheduleOnReservedNodes = false; + int numContinuousNonReservedNodeScheduled = 0; + + private final CapacityScheduler cs; + + public GlobalSchedulingThread(CapacityScheduler cs) { + setName("=============== Global Scheduling ===================="); + this.cs = cs; + setDaemon(true); + + schedulableNodes = new ConcurrentHashMap<>(); + reservedNodes = new ConcurrentHashMap<>(); + exhaustedNodes = new ConcurrentHashMap<>(); + } + + @Override + public void run() { + // Do we need to refresh nodes in next run? + + while (true) { + java.util.concurrent.locks.Lock readLock = + cs.getNodeTracker().getNodeListReadLock(); + try { + // Lock the node list prevent modifying while doing scheduling + readLock.lock(); + + // Refresh and sync from scheduler when necessary + refreshNodesWhenNecessary(); + + // Do scheduling on cached nodes + schedule(); + } finally { + readLock.unlock(); + } + + sleep(); + } + } + + private void sleep() { + int sleepTime = 100; + if (cs.getNumClusterNodes() > 0) { + sleepTime = 1000 / cs.getNumClusterNodes(); + sleepTime = Math.max(sleepTime, 1); + } + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + + private void refreshNodesWhenNecessary() { + long now = System.currentTimeMillis(); + if (lastSyncNodeListVersion != cs.getNodeTracker() + .getNodeListVersion() || now - lastRefreshNodeTS >= 3000) { + // Force refresh nodes + schedulableNodes.clear(); + exhaustedNodes.clear(); + reservedNodes.clear(); + + for (FiCaSchedulerNode node : cs.getAllNodes()) { + if (Resources.lessThanOrEqual(cs.getResourceCalculator(), + cs.getClusterResource(), node.getUnallocatedResource(), + Resources.none())) { + // Exhausted nodes + exhaustedNodes.put(node.getNodeID(), node); + } else if (node.getReservedContainer() != null) { + // Reserved nodes + reservedNodes.put(node.getNodeID(), node); + } else { + // Schedulable nodes (has available resource and not reserved) + schedulableNodes.put(node.getNodeID(), node); + } + schedulableNodes.put(node.getNodeID(), node); + } + + LOG.info( + "refresh node, now schedulable nodes = " + schedulableNodes.size()); + + lastSyncNodeListVersion = cs.getNodeTracker().getNodeListVersion(); + lastRefreshNodeTS = System.currentTimeMillis(); + } + } + + private void schedule() { + if (scheduleOnReservedNodes) { + for (FiCaSchedulerNode node : schedulableNodes.values()) { + if (node.getReservedContainer() != null) { + cs.allocateOnReservedNode(node); + } + } + scheduleOnReservedNodes = false; + } else { + cs.getRootQueue().assignContainers(cs.getClusterResource(), + new PlacementSet<>(null, schedulableNodes, + RMNodeLabelsManager.NO_LABEL), new ResourceLimits( + cs.getNodeLabelsManager() + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + cs.getClusterResource())), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + for (String partition : cs.getRMContext().getNodeLabelManager() + .getClusterNodeLabelNames()) { + cs.getRootQueue().assignContainers(cs.getClusterResource(), + new PlacementSet<>(null, schedulableNodes, partition), + new ResourceLimits(cs.getNodeLabelsManager() + .getResourceByLabel(partition, cs.getClusterResource())), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } + + // Set if do reserved allocation in next round + numContinuousNonReservedNodeScheduled++; + + if (numContinuousNonReservedNodeScheduled >= schedulableNodes.size()) { + scheduleOnReservedNodes = true; + numContinuousNonReservedNodeScheduled = 0; + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9aae909..97fe912 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; @@ -807,9 +808,12 @@ private synchronized FiCaSchedulerApp getApplication( } private void handleExcessReservedContainer(Resource clusterResource, - CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) { + CSAssignment assignment) { if (assignment.getExcessReservation() != null) { RMContainer excessReservedContainer = assignment.getExcessReservation(); + FiCaSchedulerNode node = csContext.getNode( + excessReservedContainer.getReservedNode()); + FiCaSchedulerApp app = assignment.getApplication(); if (excessReservedContainer.hasIncreaseReservation()) { unreserveIncreasedContainer(clusterResource, @@ -829,7 +833,6 @@ private void handleExcessReservedContainer(Resource clusterResource, } private void killToPreemptContainers(Resource clusterResource, - FiCaSchedulerNode node, CSAssignment assignment) { if (assignment.getContainersToKill() != null) { StringBuilder sb = new StringBuilder("Killing containers: ["); @@ -838,6 +841,7 @@ private void killToPreemptContainers(Resource clusterResource, FiCaSchedulerApp application = csContext.getApplicationAttempt( c.getApplicationAttemptId()); LeafQueue q = application.getCSLeafQueue(); + FiCaSchedulerNode node = csContext.getNode(c.getAllocatedNode()); q.completedContainer(clusterResource, application, node, c, SchedulerUtils .createPreemptedContainerStatus(c.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, @@ -861,50 +865,69 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } + + private synchronized CSAssignment handleReservedContainer( + Resource clusterResource, PlacementSet placementSet, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { + FiCaSchedulerNode node = + (FiCaSchedulerNode) placementSet.getNextAvailable(); + if (null == node) { + return null; + } + + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp application = getApplication( + reservedContainer.getApplicationAttemptId()); + synchronized (application) { + CSAssignment assignment = application.assignContainers(clusterResource, + placementSet, currentResourceLimits, schedulingMode, + reservedContainer); + handleExcessReservedContainer(clusterResource, assignment); + killToPreemptContainers(clusterResource, assignment); + return assignment; + } + } + + return null; + } @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + PlacementSet placementSet, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + orderingPolicy.getNumSchedulableEntities()); + LOG.debug("assignContainers: nodePartition=" + placementSet + .getPartition() + " #applications=" + orderingPolicy + .getNumSchedulableEntities()); } - setPreemptionAllowed(currentResourceLimits, node.getPartition()); + setPreemptionAllowed(currentResourceLimits, + placementSet.getPartition()); // Check for reserved resources - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp application = - getApplication(reservedContainer.getApplicationAttemptId()); - synchronized (application) { - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, reservedContainer); - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); - return assignment; - } + CSAssignment assignment = handleReservedContainer(clusterResource, + placementSet, currentResourceLimits, schedulingMode); + if (null != assignment) { + return assignment; } // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { + && !accessibleToPartition(placementSet.getPartition())) { return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!hasPendingResourceRequest(node.getPartition(), clusterResource, - schedulingMode)) { + if (!hasPendingResourceRequest(placementSet.getPartition(), + clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); + + schedulingMode.name() + " node-partition=" + placementSet.getPartition()); } return CSAssignment.NULL_ASSIGNMENT; } @@ -914,7 +937,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerApp application = assignmentIterator.next(); // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, placementSet.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { return CSAssignment.NULL_ASSIGNMENT; @@ -922,20 +945,18 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - node.getPartition(), schedulingMode); + placementSet.getPartition(), schedulingMode); // Check user limit if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { + application, placementSet.getPartition(), currentResourceLimits)) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); continue; } // Try to schedule - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, null); + assignment = application.assignContainers(clusterResource, placementSet, currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " @@ -946,9 +967,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Did we schedule or reserve a container? Resource assigned = assignment.getResource(); - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); + handleExcessReservedContainer(clusterResource, assignment); + killToPreemptContainers(clusterResource, assignment); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -960,20 +980,20 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer, + placementSet.getPartition(), reservedOrAllocatedRMContainer, assignment.isIncreasedAllocation()); // Update reserved metrics Resource reservedRes = assignment.getAssignmentInformation() .getReserved(); if (reservedRes != null && !reservedRes.equals(Resources.none())) { - incReservedResource(node.getPartition(), reservedRes); + incReservedResource(placementSet.getPartition(), reservedRes); } // Done return assignment; } else if (assignment.getSkipped()) { - application.updateNodeInfoForAMDiagnostics(node); + application.updateNodeInfoForAMDiagnostics(placementSet); } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..3784184 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -383,35 +383,37 @@ private synchronized void removeApplication(ApplicationId applicationId, @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits, + PlacementSet placementSet, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + String partition = placementSet.getPartition(); + // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { + && !accessibleToPartition(placementSet.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + node - .getPartition()); + + ", because it is not able to access partition=" + partition); } return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(node.getPartition(), - clusterResource, schedulingMode)) { + if (!super.hasPendingResourceRequest(partition, clusterResource, + schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); + + schedulingMode.name() + " node-partition=" + partition); } return CSAssignment.NULL_ASSIGNMENT; } CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - while (canAssign(clusterResource, node)) { + + while (canAssign(clusterResource, + (FiCaSchedulerNode) placementSet.getNextAvailable())) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " + getQueueName()); @@ -420,7 +422,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, partition, resourceLimits, Resources.createResource( getMetrics().getReservedMB(), getMetrics() .getReservedVirtualCores()), schedulingMode)) { @@ -429,7 +431,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, resourceLimits, + assignContainersToChildQueues(clusterResource, placementSet, resourceLimits, schedulingMode); assignment.setType(assignedToChild.getType()); @@ -439,7 +441,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue allocateResource(clusterResource, assignedToChild.getResource(), - node.getPartition(), assignedToChild.isIncreasedAllocation()); + partition, assignedToChild.isIncreasedAllocation()); // Track resource utilization in this pass of the scheduler Resources @@ -500,7 +502,15 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return assignment; } + // FIXME: + // Only check next-node.available resource only at root queue, and check queue's + // available resource for partition private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { + // Always return true when global scheduling enabled; + if (csContext.globalSchedulingEnabled()) { + return true; + } + // Two conditions need to meet when trying to allocate: // 1) Node doesn't have reserved container // 2) Node's available-resource + killable-resource should > 0 @@ -546,8 +556,8 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, return new ResourceLimits(childLimit); } - private Iterator sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) { - if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + private Iterator sortAndGetChildrenAllocationIterator(String partition) { + if (partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (needToResortQueuesAtNextAllocation) { // If we skipped resort queues last time, we need to re-sort queue // before allocation @@ -559,23 +569,25 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, return childQueues.iterator(); } - partitionQueueComparator.setPartitionToLookAt(node.getPartition()); + partitionQueueComparator.setPartitionToLookAt(partition); List childrenList = new ArrayList<>(childQueues); Collections.sort(childrenList, partitionQueueComparator); return childrenList.iterator(); } - + private synchronized CSAssignment assignContainersToChildQueues( - Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, - SchedulingMode schedulingMode) { + Resource cluster, PlacementSet placementSet, + ResourceLimits limits, SchedulingMode schedulingMode) { + String partition = placementSet.getPartition(); + CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); printChildQueues(); // Try to assign to most 'under-served' sub-queue - for (Iterator iter = sortAndGetChildrenAllocationIterator(node); iter - .hasNext();) { + for (Iterator iter = sortAndGetChildrenAllocationIterator( + partition); iter.hasNext(); ) { CSQueue childQueue = iter.next(); if(LOG.isDebugEnabled()) { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() @@ -584,9 +596,9 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); + getResourceLimitsOfChild(childQueue, cluster, limits, partition); - assignment = childQueue.assignContainers(cluster, node, + assignment = childQueue.assignContainers(cluster, placementSet, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + @@ -600,7 +612,7 @@ private synchronized CSAssignment assignContainersToChildQueues( assignment.getResource(), Resources.none())) { // Only update childQueues when we doing non-partitioned node // allocation. - if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { + if (RMNodeLabelsManager.NO_LABEL.equals(partition)) { // Remove and re-insert to sort iter.remove(); LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..427b898 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -128,6 +128,6 @@ protected CSAssignment getCSAssignmentFromAllocateResult( * */ public abstract CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet placementSet, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 8f749f6..0a9ab7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.List; @@ -60,6 +62,7 @@ NodeType requestNodeType = NodeType.NODE_LOCAL; Container updatedContainer; private List toKillContainers; + FiCaSchedulerNode nodeToAllocate; public ContainerAllocation(RMContainer containerToBeUnreserved, Resource resourceToBeAllocated, AllocationState state) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 3be8e0e..02c5cf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -25,8 +25,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -46,17 +46,17 @@ public ContainerAllocator(FiCaSchedulerApp application, @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet placementSet, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { if (reservedContainer != null) { if (reservedContainer.getState() == RMContainerState.RESERVED) { // It's a regular container return regularContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + placementSet, schedulingMode, resourceLimits, reservedContainer); } else { // It's a increase container return increaseContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + placementSet, schedulingMode, resourceLimits, reservedContainer); } } else { /* @@ -64,14 +64,16 @@ public CSAssignment assignContainers(Resource clusterResource, * anything, we will try to allocate regular container */ CSAssignment assign = - increaseContainerAllocator.assignContainers(clusterResource, node, + increaseContainerAllocator.assignContainers(clusterResource, + placementSet, schedulingMode, resourceLimits, null); if (Resources.greaterThan(rc, clusterResource, assign.getResource(), Resources.none())) { return assign; } - return regularContainerAllocator.assignContainers(clusterResource, node, + return regularContainerAllocator.assignContainers(clusterResource, + placementSet, schedulingMode, resourceLimits, null); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 4a2ae18..18cb3c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -175,17 +176,23 @@ private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node, @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet placementSet, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { AppSchedulingInfo sinfo = application.getAppSchedulingInfo(); - NodeId nodeId = node.getNodeID(); + if (null == placementSet.getNextAvailable()) { + // TODO, fix IncreaseContainerAllocator to be able to schedule for + // global scheduling + return CSAssignment.SKIP_ASSIGNMENT; + } + + NodeId nodeId = placementSet.getNextAvailable().getNodeID(); if (reservedContainer == null) { // Do we have increase request on this node? if (!sinfo.hasIncreaseRequest(nodeId)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip allocating increase request since we don't have any" - + " increase request on this node=" + node.getNodeID()); + + " increase request on this node=" + nodeId); } return CSAssignment.SKIP_ASSIGNMENT; @@ -294,7 +301,8 @@ public CSAssignment assignContainers(Resource clusterResource, } if (!Resources.fitsIn(rc, clusterResource, - increaseRequest.getTargetCapacity(), node.getTotalResource())) { + increaseRequest.getTargetCapacity(), + placementSet.getNextAvailable().getTotalResource())) { // if the target capacity is more than what the node can offer, we // will simply remove and skip it. // The reason of doing check here instead of adding increase request @@ -302,15 +310,18 @@ public CSAssignment assignContainers(Resource clusterResource, // request added. if (LOG.isDebugEnabled()) { LOG.debug(" Target capacity is more than what node can offer," - + " node.resource=" + node.getTotalResource()); + + " node.resource=" + placementSet.getNextAvailable() + .getTotalResource()); } toBeRemovedRequests.add(increaseRequest); continue; } // Try to allocate the increase request - assigned = - allocateIncreaseRequest(node, clusterResource, increaseRequest); + assigned = allocateIncreaseRequest( + (FiCaSchedulerNode) placementSet.getNextAvailable(), + clusterResource, + increaseRequest); if (!assigned.getSkipped()) { // When we don't skip this request, which means we either allocated // OR reserved this request. We will break @@ -362,9 +373,9 @@ public CSAssignment assignContainers(Resource clusterResource, // We don't need this container now, just return excessive reservation return new CSAssignment(application, reservedContainer); } - - return allocateIncreaseRequestFromReservedContainer(node, clusterResource, - request); + + return allocateIncreaseRequestFromReservedContainer( + placementSet.getNextAvailable(), clusterResource, request); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 29b37d8..13d3cd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; +import org.apache.commons.collections.IteratorUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +33,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; - - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer.SchedulerNodesScorer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer.SchedulerNodesScorerCache; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; /** @@ -54,43 +56,37 @@ * delayed scheduling mechanism to get better locality allocation. */ public class RegularContainerAllocator extends AbstractContainerAllocator { - private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); - + private static final Log LOG = LogFactory.getLog( + RegularContainerAllocator.class); + private ResourceRequest lastResourceRequest = null; - + public RegularContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { super(application, rc, rmContext); } - - private boolean checkHeadroom(Resource clusterResource, + + private boolean checkHeadroomForPartition(Resource clusterResource, ResourceLimits currentResourceLimits, Resource required, - FiCaSchedulerNode node) { + String partition) { // If headroom + currentReservation < required, we cannot allocate this // require Resource resourceCouldBeUnReserved = application.getCurrentReservation(); if (!application.getCSLeafQueue().getReservationContinueLooking() - || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + || partition.equals(RMNodeLabelsManager.NO_LABEL)) { // If we don't allow reservation continuous looking, OR we're looking at // non-default node partition, we won't allow to unreserve before // allocation. resourceCouldBeUnReserved = Resources.none(); } - return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( - currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), + return Resources.greaterThanOrEqual(rc, clusterResource, Resources + .add(currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), required); } - private ContainerAllocation preCheckForNewContainer(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + String partition, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { - if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { - application.updateAppSkipNodeDiagnostics( - CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); - return ContainerAllocation.APP_SKIPPED; - } - ResourceRequest anyRequest = application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (null == anyRequest) { @@ -110,8 +106,8 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { if (application.isWaitingForAMContainer()) { if (LOG.isDebugEnabled()) { - LOG.debug("Skip allocating AM container to app_attempt=" - + application.getApplicationAttemptId() + LOG.debug("Skip allocating AM container to app_attempt=" + application + .getApplicationAttemptId() + ", don't allow to allocate AM container in non-exclusive mode"); } application.updateAppSkipNodeDiagnostics( @@ -124,8 +120,7 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, // matches the node's label? // If not match, jump to next priority. if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - anyRequest.getNodeLabelExpression(), node.getPartition(), - schedulingMode)) { + anyRequest.getNodeLabelExpression(), partition, schedulingMode)) { return ContainerAllocation.PRIORITY_SKIPPED; } @@ -138,7 +133,8 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, } } - if (!checkHeadroom(clusterResource, resourceLimits, required, node)) { + if (!checkHeadroomForPartition(clusterResource, resourceLimits, required, + partition)) { if (LOG.isDebugEnabled()) { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); @@ -153,8 +149,8 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, // This is to make sure non-partitioned-resource-request will prefer // to be allocated to non-partitioned nodes int missedNonPartitionedRequestSchedulingOpportunity = 0; - if (anyRequest.getNodeLabelExpression() - .equals(RMNodeLabelsManager.NO_LABEL)) { + if (anyRequest.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL)) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( schedulerKey); @@ -178,7 +174,7 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, return ContainerAllocation.APP_SKIPPED; } } - + return null; } @@ -186,16 +182,7 @@ ContainerAllocation preAllocation(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { - ContainerAllocation result; - if (null == reservedContainer) { - // pre-check when allocating new container - result = - preCheckForNewContainer(clusterResource, node, schedulingMode, - resourceLimits, schedulerKey); - if (null != result) { - return result; - } - } else { + if (null != reservedContainer) { // pre-check when allocating reserved container if (application.getTotalRequiredResources(schedulerKey) == 0) { // Release @@ -204,18 +191,20 @@ ContainerAllocation preAllocation(Resource clusterResource, } } + ContainerAllocation result; // Try to allocate containers on node result = assignContainersOnNode(clusterResource, node, schedulerKey, reservedContainer, schedulingMode, resourceLimits); - + if (null == reservedContainer) { if (result.state == AllocationState.PRIORITY_SKIPPED) { // Don't count 'skipped nodes' as a scheduling opportunity! application.subtractSchedulingOpportunity(schedulerKey); } } - + result.nodeToAllocate = node; + return result; } @@ -224,15 +213,15 @@ public synchronized float getLocalityWaitFactor( // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = Math.max(application.getResourceRequests(schedulerKey).size() - 1, 0); - + // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities - return Math.min(((float)requiredResources / clusterNodes), 1.0f); + return Math.min(((float) requiredResources / clusterNodes), 1.0f); } - + private int getActualNodeLocalityDelay() { - return Math.min(rmContext.getScheduler().getNumClusterNodes(), application - .getCSLeafQueue().getNodeLocalityDelay()); + return Math.min(rmContext.getScheduler().getNumClusterNodes(), + application.getCSLeafQueue().getNodeLocalityDelay()); } private boolean canAssign(SchedulerRequestKey schedulerKey, @@ -336,9 +325,13 @@ private ContainerAllocation assignOffSwitchContainers( } private ContainerAllocation assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, - RMContainer reservedContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { + FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { + application.updateAppSkipNodeDiagnostics( + CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); + return ContainerAllocation.APP_SKIPPED; + } ContainerAllocation allocation; @@ -399,7 +392,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); allocation.requestNodeType = requestType; - + // When a returned allocation is LOCALITY_SKIPPED, since we're in // off-switch request now, we will skip this app w.r.t priorities if (allocation.state == AllocationState.LOCALITY_SKIPPED) { @@ -417,7 +410,7 @@ private ContainerAllocation assignContainer(Resource clusterResource, ResourceRequest request, NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { lastResourceRequest = request; - + if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() @@ -440,8 +433,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, Resource available = node.getUnallocatedResource(); Resource totalResource = node.getTotalResource(); - if (!Resources.lessThanOrEqual(rc, clusterResource, - capability, totalResource)) { + if (!Resources.lessThanOrEqual(rc, clusterResource, capability, + totalResource)) { LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); @@ -458,14 +451,12 @@ private ContainerAllocation assignContainer(Resource clusterResource, // How much need to unreserve equals to: // max(required - headroom, amountNeedUnreserve) - Resource resourceNeedToUnReserve = - Resources.max(rc, clusterResource, - Resources.subtract(capability, currentResoureLimits.getHeadroom()), - currentResoureLimits.getAmountNeededUnreserve()); + Resource resourceNeedToUnReserve = Resources.max(rc, clusterResource, + Resources.subtract(capability, currentResoureLimits.getHeadroom()), + currentResoureLimits.getAmountNeededUnreserve()); - boolean needToUnreserve = - Resources.greaterThan(rc, clusterResource, - resourceNeedToUnReserve, Resources.none()); + boolean needToUnreserve = Resources.greaterThan(rc, clusterResource, + resourceNeedToUnReserve, Resources.none()); RMContainer unreservedContainer = null; boolean reservationsContinueLooking = @@ -475,18 +466,16 @@ private ContainerAllocation assignContainer(Resource clusterResource, List toKillContainers = null; if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) { Resource availableAndKillable = Resources.clone(available); - for (RMContainer killableContainer : node - .getKillableContainers().values()) { + for (RMContainer killableContainer : node.getKillableContainers() + .values()) { if (null == toKillContainers) { toKillContainers = new ArrayList<>(); } toKillContainers.add(killableContainer); Resources.addTo(availableAndKillable, - killableContainer.getAllocatedResource()); - if (Resources.fitsIn(rc, - clusterResource, - capability, - availableAndKillable)) { + killableContainer.getAllocatedResource()); + if (Resources.fitsIn(rc, clusterResource, capability, + availableAndKillable)) { // Stop if we find enough spaces availableContainers = 1; break; @@ -498,8 +487,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, // Allocate... // We will only do continuous reservation when this is not allocated from // reserved container - if (rmContainer == null && reservationsContinueLooking - && node.getLabels().isEmpty()) { + if (rmContainer == null && reservationsContinueLooking && node.getLabels() + .isEmpty()) { // when reservationsContinueLooking is set, we may need to unreserve // some containers to meet this queue, its parents', or the users' // resource limits. @@ -514,9 +503,11 @@ private ContainerAllocation assignContainer(Resource clusterResource, // under the limit. resourceNeedToUnReserve = capability; } + unreservedContainer = application.findNodeToUnreserve(clusterResource, node, schedulerKey, resourceNeedToUnReserve); + // When (minimum-unreserved-resource > 0 OR we cannot allocate // new/reserved // container (That means we *have to* unreserve some resource to @@ -529,9 +520,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, } } - ContainerAllocation result = - new ContainerAllocation(unreservedContainer, request.getCapability(), - AllocationState.ALLOCATED); + ContainerAllocation result = new ContainerAllocation(unreservedContainer, + request.getCapability(), AllocationState.ALLOCATED); result.containerNodeType = type; result.setToKillContainers(toKillContainers); return result; @@ -548,19 +538,18 @@ private ContainerAllocation assignContainer(Resource clusterResource, LOG.debug("we needed to unreserve to be able to allocate"); } // Skip the locality request - return ContainerAllocation.LOCALITY_SKIPPED; + return ContainerAllocation.LOCALITY_SKIPPED; } } - ContainerAllocation result = - new ContainerAllocation(null, request.getCapability(), - AllocationState.RESERVED); + ContainerAllocation result = new ContainerAllocation(null, + request.getCapability(), AllocationState.RESERVED); result.containerNodeType = type; result.setToKillContainers(null); return result; } // Skip the locality request - return ContainerAllocation.LOCALITY_SKIPPED; + return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -594,7 +583,7 @@ boolean shouldAllocOrReserveNewContainer( } return (((starvation + requiredContainers) - reservedContainers) > 0); } - + private Container getContainer(RMContainer rmContainer, FiCaSchedulerNode node, Resource capability, SchedulerRequestKey schedulerKey) { @@ -606,15 +595,14 @@ private Container createContainer(FiCaSchedulerNode node, Resource capability, SchedulerRequestKey schedulerKey) { NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = - BuilderUtils.newContainerId(application.getApplicationAttemptId(), - application.getNewContainerId()); + ContainerId containerId = BuilderUtils.newContainerId( + application.getApplicationAttemptId(), application.getNewContainerId()); // Create the container return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() .getHttpAddress(), capability, schedulerKey.getPriority(), null); } - + private ContainerAllocation handleNewContainerAllocation( ContainerAllocation allocationResult, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, @@ -624,7 +612,7 @@ private ContainerAllocation handleNewContainerAllocation( if (reservedContainer != null) { application.unreserve(schedulerKey, node, reservedContainer); } - + // Inform the application RMContainer allocatedContainer = application.allocate(allocationResult.containerNodeType, node, @@ -633,20 +621,20 @@ private ContainerAllocation handleNewContainerAllocation( // Does the application need this resource? if (allocatedContainer == null) { // Skip this app if we failed to allocate. - ContainerAllocation ret = - new ContainerAllocation(allocationResult.containerToBeUnreserved, - null, AllocationState.APP_SKIPPED); + ContainerAllocation ret = new ContainerAllocation( + allocationResult.containerToBeUnreserved, null, + AllocationState.APP_SKIPPED); return ret; } // Inform the node node.allocateContainer(allocatedContainer); - + // update locality statistics application.incNumAllocatedContainers(allocationResult.containerNodeType, allocationResult.requestNodeType); - - return allocationResult; + + return allocationResult; } ContainerAllocation doAllocation(ContainerAllocation allocationResult, @@ -659,8 +647,8 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, // something went wrong getting/creating the container if (container == null) { - application - .updateAppSkipNodeDiagnostics("Scheduling of container failed. "); + application.updateAppSkipNodeDiagnostics( + "Scheduling of container failed. "); LOG.warn("Couldn't get container for allocation!"); return ContainerAllocation.APP_SKIPPED; } @@ -712,59 +700,96 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, } private ContainerAllocation allocate(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet placementSet, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { - ContainerAllocation result = - preAllocation(clusterResource, node, schedulingMode, resourceLimits, - schedulerKey, reservedContainer); + // Check partition resource + if (null == reservedContainer) { + // pre-check when allocating new container + ContainerAllocation result = preCheckForNewContainer(clusterResource, + placementSet.getPartition(), schedulingMode, resourceLimits, + schedulerKey); + if (null != result) { + return result; + } + } - if (AllocationState.ALLOCATED == result.state - || AllocationState.RESERVED == result.state) { - result = doAllocation(result, node, schedulerKey, reservedContainer); + // When trying to allocate reserved container, only look at reserved node, + // otherwise look at nodes ordered by scorer + Iterator iter; + if (null == reservedContainer && application.getCapacitySchedulerContext() + .globalSchedulingEnabled()) { + SchedulerNodesScorer scorer = + SchedulerNodesScorerCache.getOrCreateScorer(application, schedulerKey); + iter = scorer.scorePlacementSet(placementSet); + } else { + iter = IteratorUtils.singletonIterator( + placementSet.getNextAvailable()); + } + + ContainerAllocation result = ContainerAllocation.PRIORITY_SKIPPED; + while (iter.hasNext()) { + FiCaSchedulerNode node = iter.next(); + + // Avoid node with <= 0 resources OR has reserved containers when not + // allocate from reserved container + if ((node.getReservedContainer() != null && reservedContainer == null) + || Resources.lessThanOrEqual(rc, clusterResource, + node.getUnallocatedResource(), Resources.none())) { + continue; + } + + // FIXME: part of the preAllocation can be extracted to avoid duplicated + // check for resource-requests across nodes. + result = preAllocation(clusterResource, node, schedulingMode, + resourceLimits, schedulerKey, reservedContainer); + + if (AllocationState.ALLOCATED == result.state + || AllocationState.RESERVED == result.state) { + result = doAllocation(result, node, schedulerKey, reservedContainer); + break; + } } return result; } - + @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, - RMContainer reservedContainer) { + PlacementSet placementSet, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, RMContainer reservedContainer) { if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. if (!application.hasPendingResourceRequest(rc, - node.getPartition(), clusterResource, schedulingMode)) { + placementSet.getPartition(), clusterResource, + schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + node.getPartition()); + + schedulingMode.name() + " node-label=" + placementSet + .getPartition()); } return CSAssignment.SKIP_ASSIGNMENT; } - + // Schedule in priority order for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { - ContainerAllocation result = - allocate(clusterResource, node, schedulingMode, resourceLimits, - schedulerKey, null); + ContainerAllocation result = allocate(clusterResource, placementSet, schedulingMode, resourceLimits, schedulerKey, + null); AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { continue; } - return getCSAssignmentFromAllocateResult(clusterResource, result, - null); + return getCSAssignmentFromAllocateResult(clusterResource, result, null); } // We will reach here if we skipped all priorities of the app, so we will // skip the app. return CSAssignment.SKIP_ASSIGNMENT; } else { - ContainerAllocation result = - allocate(clusterResource, node, schedulingMode, resourceLimits, - reservedContainer.getReservedSchedulerKey(), reservedContainer); + ContainerAllocation result = allocate(clusterResource, placementSet, schedulingMode, resourceLimits, + reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PlacementSet.java new file mode 100644 index 0000000..3ad7c4d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PlacementSet.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class PlacementSet { + private N nextAvailable; + private Map allSchedulableNodes; + private String partition; + + public PlacementSet(N nextAvailable, Map allSchedulable, + String partition) { + this.nextAvailable = nextAvailable; + this.allSchedulableNodes = allSchedulable; + this.partition = partition; + } + + /* + * "I don't care, just give me next node to allocate" + */ + public N getNextAvailable() { + return nextAvailable; + } + + /* + * "I'm picky, give me all you have and I will decide" + */ + public Map getAllSchedulableNodes() { + return allSchedulableNodes; + } + + public String getPartition() { + return partition; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 67d93a4..ae9ee90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,12 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -55,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; @@ -66,11 +62,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -495,7 +496,7 @@ public LeafQueue getCSLeafQueue() { } public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + PlacementSet placementSet, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " @@ -504,7 +505,7 @@ public CSAssignment assignContainers(Resource clusterResource, } synchronized (this) { - return containerAllocator.assignContainers(clusterResource, node, + return containerAllocator.assignContainers(clusterResource, placementSet, schedulingMode, currentResourceLimits, reservedContainer); } } @@ -587,7 +588,8 @@ public void updateAppSkipNodeDiagnostics(String message) { this.appSkipNodeDiagnostics = message; } - public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { + public void updateNodeInfoForAMDiagnostics( + PlacementSet candidates) { if (isWaitingForAMContainer()) { StringBuilder diagnosticMessageBldr = new StringBuilder(); if (appSkipNodeDiagnostics != null) { @@ -596,15 +598,26 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { } diagnosticMessageBldr.append( CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG); - diagnosticMessageBldr.append(node.getNodeID()); - diagnosticMessageBldr.append(" ( Partition : "); - diagnosticMessageBldr.append(node.getLabels()); - diagnosticMessageBldr.append(", Total resource : "); - diagnosticMessageBldr.append(node.getTotalResource()); - diagnosticMessageBldr.append(", Available resource : "); - diagnosticMessageBldr.append(node.getUnallocatedResource()); - diagnosticMessageBldr.append(" )."); + + SchedulerNode node = candidates.getNextAvailable(); + + // TODO, fix this when global scheduling enabled. + if (null != node) { + diagnosticMessageBldr.append(node.getNodeID()); + diagnosticMessageBldr.append(" ( Partition : "); + diagnosticMessageBldr.append(node.getLabels()); + diagnosticMessageBldr.append(", Total resource : "); + diagnosticMessageBldr.append(node.getTotalResource()); + diagnosticMessageBldr.append(", Available resource : "); + diagnosticMessageBldr.append(node.getUnallocatedResource()); + diagnosticMessageBldr.append(" )."); + } + updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString()); } } + + public CapacitySchedulerContext getCapacitySchedulerContext() { + return capacitySchedulerContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/AbstractSchedulerNodesScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/AbstractSchedulerNodesScorer.java new file mode 100644 index 0000000..f8af71f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/AbstractSchedulerNodesScorer.java @@ -0,0 +1,24 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public abstract class AbstractSchedulerNodesScorer + implements SchedulerNodesScorer { + SchedulerApplicationAttempt attempt; + SchedulerRequestKey schedulerKey; + ReentrantReadWriteLock.ReadLock readLock; + ReentrantReadWriteLock.WriteLock writeLock; + + AbstractSchedulerNodesScorer(SchedulerApplicationAttempt attempt, + SchedulerRequestKey schedulerKey) { + this.attempt = attempt; + this.schedulerKey = schedulerKey; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/DoNotCareNodesScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/DoNotCareNodesScorer.java new file mode 100644 index 0000000..8074814 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/DoNotCareNodesScorer.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; + +import java.util.Iterator; + +public class DoNotCareNodesScorer + implements SchedulerNodesScorer { + @Override + public Iterator scorePlacementSet( + PlacementSet candidates) { + return candidates.getAllSchedulableNodes().values().iterator(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/LocalityNodesScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/LocalityNodesScorer.java new file mode 100644 index 0000000..61777a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/LocalityNodesScorer.java @@ -0,0 +1,120 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; + +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class LocalityNodesScorer + extends AbstractSchedulerNodesScorer { + private long lastInitializedTime = 0; + + private ConcurrentLinkedQueue nodeLocalHosts; + private ConcurrentLinkedQueue rackLocalHosts; + private ConcurrentLinkedQueue offswitchHosts; + + public LocalityNodesScorer(SchedulerApplicationAttempt attempt, + SchedulerRequestKey schedulerKey) { + super(attempt, schedulerKey); + } + + private void reinitializeIfNeeded(PlacementSet candidates) { + // Do not reinitialize in 5000 ms. + // FIXME: this should be configurable and will be forced to update when + // Requirement changes, etc. + if (System.currentTimeMillis() - 5000L < lastInitializedTime) { + return; + } + + lastInitializedTime = System.currentTimeMillis(); + + try { + writeLock.lock(); + if (null == nodeLocalHosts) { + nodeLocalHosts = new ConcurrentLinkedQueue<>(); + rackLocalHosts = new ConcurrentLinkedQueue<>(); + offswitchHosts = new ConcurrentLinkedQueue<>(); + } else { + nodeLocalHosts.clear(); + rackLocalHosts.clear(); + offswitchHosts.clear(); + } + + // We don't need any resource + boolean needResource = attempt.getResourceRequest(schedulerKey, + ResourceRequest.ANY).getNumContainers() > 0; + if (!needResource) { + return; + } + + for (Map.Entry entry : candidates.getAllSchedulableNodes() + .entrySet()) { + NodeId nodeId = entry.getKey(); + N node = entry.getValue(); + String rack = node.getRackName(); + + ResourceRequest rr = attempt.getAppSchedulingInfo().getResourceRequest( + schedulerKey, nodeId.getHost()); + if (rr != null && rr.getNumContainers() > 0) { + nodeLocalHosts.add(node); + } else { + rr = attempt.getAppSchedulingInfo().getResourceRequest(schedulerKey, + rack); + boolean hasRackLocalRequest = rr != null && rr.getNumContainers() > 0; + if (hasRackLocalRequest) { + rackLocalHosts.add(node); + } else { + offswitchHosts.add(node); + } + } + } + } finally { + writeLock.unlock(); + } + } + + private void moveFirstToLast(ConcurrentLinkedQueue queue) { + N n = null; + try { + n = queue.poll(); + } catch (NoSuchElementException e) { + // do nothing; + } + + if (n != null) { + queue.offer(n); + } + } + + @Override + public Iterator scorePlacementSet( + PlacementSet candidates) { + reinitializeIfNeeded(candidates); + + try { + writeLock.lock(); + moveFirstToLast(nodeLocalHosts); + moveFirstToLast(rackLocalHosts); + moveFirstToLast(offswitchHosts); + } finally { + writeLock.unlock(); + } + + try { + readLock.lock(); + return IteratorUtils.chainedIterator( + new Iterator[] { nodeLocalHosts.iterator(), rackLocalHosts.iterator(), + offswitchHosts.iterator() }); + } finally { + readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorer.java new file mode 100644 index 0000000..8a89e40 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; + +import java.util.Iterator; + +public interface SchedulerNodesScorer { + /** + * Score nodes according to given placement set. + * @param placementSet + * @return sorted nodes based on goodness + */ + Iterator scorePlacementSet(PlacementSet placementSet); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java new file mode 100644 index 0000000..41f222c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java @@ -0,0 +1,70 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + +import java.util.Map; + +/** + * Do necessary caching for scorer according to type and applications + */ +public class SchedulerNodesScorerCache { + // At most store 10K objects + private static LRUMap lruCache = new LRUMap(1024 * 10); + + private static SchedulerNodesScorerType getSchedulerNodesScorerType( + SchedulerApplicationAttempt attempt, SchedulerRequestKey schedulerKey) { + Map requests = attempt.getResourceRequests( + schedulerKey); + + // Simplest rule to determine with nodes scorer will be used: + // When requested #resourceName > 0, use locality, otherwise use DO_NOT_CARE + if (requests != null && requests.size() > 1) { + return SchedulerNodesScorerType.LOCALITY; + } + + return SchedulerNodesScorerType.ANY; + } + + public static SchedulerNodesScorer getOrCreateScorer( + SchedulerApplicationAttempt attempt, SchedulerRequestKey schedulerKey) { + SchedulerNodesScorerType type = getSchedulerNodesScorerType(attempt, + schedulerKey); + + return getOrCreateScorer(attempt, schedulerKey, type); + } + + public static SchedulerNodesScorer getOrCreateScorer( + SchedulerApplicationAttempt attempt, SchedulerRequestKey schedulerKey, + SchedulerNodesScorerType type) { + String key = attempt.getApplicationAttemptId().toString() + schedulerKey + .getPriority().toString(); + SchedulerNodesScorer scorer; + // scorer = (SchedulerNodesScorer) lruCache.get(key); + // FIXME: need to correctly compare if we need to update + scorer = null; + + if (null == scorer) { + // FIXME, for simple, create scorer every time. We can cache scorer + // without any issue + switch (type) { + case LOCALITY: + scorer = new LocalityNodesScorer<>(attempt, schedulerKey); + break; + case ANY: + scorer = new DoNotCareNodesScorer<>(); + break; + default: + return null; + } + + lruCache.put(key, scorer); + } + + return scorer; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java new file mode 100644 index 0000000..db9c6fc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +public enum SchedulerNodesScorerType { + ANY, // Any nodes is fine + LOCALITY, // Locality-based +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSGlobalScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSGlobalScheduling.java new file mode 100644 index 0000000..aae5a7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSGlobalScheduling.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; + +// TODO, writing tests here.. +public class TestCSGlobalScheduling { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test + public void testSimpleGlobalScheduling() throws Exception { + // inject node label manager + conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULE_GLOBALLY_ENABLE, true); + + MockRM rm1 = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = + + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.create(0), "*", Resources.createResource(1024), + 3), ResourceRequest + .newInstance(Priority.create(1), "h1", Resources.createResource(1024), + 3), ResourceRequest + .newInstance(Priority.create(1), "h2", Resources.createResource(1024), + 1), ResourceRequest.newInstance(Priority.create(1), "/default-rack", + Resources.createResource(1024), 4), ResourceRequest + .newInstance(Priority.create(1), "*", Resources.createResource(1024), + 4)), null); + + + // request a container. + // am1.allocate("*", 1024, 1, new ArrayList()); + + Thread.sleep(1000000); + + + rm1.close(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d3567f5..67c9446 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -789,7 +789,7 @@ public void testAllocateReorder() throws Exception { null, null, null, null); //And this will result in container assignment for app1 - CapacityScheduler.schedule(cs); + CapacityScheduler.AsyncScheduleThread.asyncSchedule(cs); //Verify that app1 is still first in assignment order //This happens because app2 has no demand/a magnitude of NaN, which @@ -993,7 +993,7 @@ public void testAsyncScheduling() throws Exception { // Now directly exercise the scheduling loop for (int i=0; i < NODES; ++i) { - CapacityScheduler.schedule(cs); + CapacityScheduler.AsyncScheduleThread.asyncSchedule(cs); } }