diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index cfec915..4fc831e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -798,14 +798,18 @@ public void updateDemand() { Resources.addTo(demand, getCurrentConsumption()); // Add up outstanding resource requests - synchronized (this) { - for (Priority p : getPriorities()) { - for (ResourceRequest r : getResourceRequests(p).values()) { - Resources.multiplyAndAddTo(demand, - r.getCapability(), r.getNumContainers()); - } + Resources.addTo(demand, getOutstandingDemand()); + } + + public synchronized Resource getOutstandingDemand() { + Resource outstandingDemand = Resources.createResource(0); + for (Priority p : getPriorities()) { + for (ResourceRequest r : getResourceRequests(p).values()) { + Resources.multiplyAndAddTo(outstandingDemand, + r.getCapability(), r.getNumContainers()); } } + return outstandingDemand; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index f90a198..0fa4939 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -303,6 +303,20 @@ private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { demand = Resources.componentwiseMin(demand, maxRes); } + public Resource getOutstandingDemandOfRunnableApps() { + Resource maxRes = scheduler.getAllocationConfiguration() + .getMaxResources(getName()); + Resource outstandingDemand = Resources.createResource(0); + for (AppSchedulable sched : runnableAppScheds) { + if (Resources.equals(outstandingDemand, maxRes)) { + break; + } + outstandingDemand = Resources.add(outstandingDemand, sched.getOutstandingDemand()); + outstandingDemand = Resources.componentwiseMin(outstandingDemand, maxRes); + } + return outstandingDemand; + } + @Override public Resource assignContainer(FSSchedulerNode node) { Resource assigned = Resources.none(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3eefb8f..f7664ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -176,6 +176,7 @@ protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not + protected boolean demandBlockAmEnabled; // AM is blocked if queue has demand protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling private Comparator nodeAvailableResourceComparator = new NodeAvailableResourceComparator(); // Node available resource comparator @@ -239,6 +240,16 @@ private void validateConf(Configuration conf) { + "=" + maxVcores + ", min should equal greater than 0" + ", max should be no smaller than min."); } + + boolean demandBlockAmEnabled = conf.getBoolean( + FairSchedulerConfiguration.DEMAND_BLOCK_AM_ENABLED, + FairSchedulerConfiguration.DEFAULT_DEMAND_BLOCK_AM_ENABLED); + boolean continuousSchedulingEnabled = conf.getBoolean( + FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, + FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); + if (demandBlockAmEnabled && !continuousSchedulingEnabled) { + throw new YarnRuntimeException("continuous-scheduling-enabled must be true if demand-block-am-enabled is true"); + } } public FairSchedulerConfiguration getConf() { @@ -566,6 +577,10 @@ public boolean isContinuousSchedulingEnabled() { return continuousSchedulingEnabled; } + public boolean isDemandBlockAmEnabled() { + return demandBlockAmEnabled; + } + public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } @@ -669,12 +684,17 @@ protected synchronized void addApplicationAttempt( } application.setCurrentAppAttempt(attempt); - boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); - queue.addApp(attempt, runnable); - if (runnable) { - maxRunningEnforcer.trackRunnableApp(attempt); + boolean runnable; + if (demandBlockAmEnabled) { + runnable = false; } else { - maxRunningEnforcer.trackNonRunnableApp(attempt); + runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); + queue.addApp(attempt, runnable); + if (runnable) { + maxRunningEnforcer.trackRunnableApp(attempt); + } else { + maxRunningEnforcer.trackNonRunnableApp(attempt); + } } queue.getMetrics().submitAppAttempt(user); @@ -1041,6 +1061,12 @@ void continuousSchedulingAttempt() throws InterruptedException { long duration = getClock().getTime() - start; fsOpDurations.addContinuousSchedulingRunDuration(duration); + + if (demandBlockAmEnabled) { + synchronized (this) { + maxRunningEnforcer.updateRunnabilityOnReload(); + } + } } /** Sort nodes by available resource */ @@ -1314,6 +1340,8 @@ private void initScheduler(Configuration conf) throws IOException { continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs(); + demandBlockAmEnabled = this.conf.isDemandBlockAmEnabled(); + maxRunningEnforcer.setDemandBlockAmEnabled(demandBlockAmEnabled); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index e477e6e..862196e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -95,6 +95,10 @@ protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled"; protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false; + /** Enable blocking ApplicationMaster by demand. */ + protected static final String DEMAND_BLOCK_AM_ENABLED = CONF_PREFIX + "demand-block-am-enabled"; + protected static final boolean DEFAULT_DEMAND_BLOCK_AM_ENABLED = false; + /** Sleep time of each pass in continuous scheduling (5ms in default) */ protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms"; protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5; @@ -179,6 +183,10 @@ public boolean isContinuousSchedulingEnabled() { return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); } + public boolean isDemandBlockAmEnabled() { + return getBoolean(DEMAND_BLOCK_AM_ENABLED, DEFAULT_DEMAND_BLOCK_AM_ENABLED); + } + public int getContinuousSchedulingSleepMs() { return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index f750438..b2f2cc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -30,6 +30,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; /** * Handles tracking and enforcement for user and queue maxRunningApps @@ -39,6 +42,7 @@ private static final Log LOG = LogFactory.getLog(FairScheduler.class); private final FairScheduler scheduler; + private boolean demandBlockAmEnabled; // Tracks the number of running applications by user. private final Map usersNumRunnableApps; @@ -51,6 +55,10 @@ public MaxRunningAppsEnforcer(FairScheduler scheduler) { this.usersNonRunnableApps = ArrayListMultimap.create(); } + public void setDemandBlockAmEnabled(boolean enabled) { + demandBlockAmEnabled = enabled; + } + /** * Checks whether making the application runnable would exceed any * maxRunningApps limits. @@ -64,6 +72,7 @@ public boolean canAppBeRunnable(FSQueue queue, String user) { if (userNumRunnable >= allocConf.getUserMaxApps(user)) { return false; } + // Check queue and all parent queues while (queue != null) { int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); @@ -73,9 +82,53 @@ public boolean canAppBeRunnable(FSQueue queue, String user) { queue = queue.getParent(); } + if (demandBlockAmEnabled) { + if (isDemandBlockingApp(queue)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Demand is blocking ApplicationMaster"); + } + return false; + } + } + return true; } + private boolean isDemandBlockingApp(FSQueue queue) { + ResourceWeights targetWeight = queue.getWeights(); + + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + while (allocConf.getMaxResources(queue.getName()).equals(Resources.unbounded())) { + queue = queue.getParent(); + if (queue == null) { + return false; + } + } + + return hasHigherWeightOutstandingDemand(queue, targetWeight); + } + + private boolean hasHigherWeightOutstandingDemand(FSQueue queue, ResourceWeights targetWeight) { + if (queue instanceof FSLeafQueue) { + ResourceWeights weight = queue.getWeights(); + if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) { + return false; + } + FSLeafQueue leaf = (FSLeafQueue) queue; + if (!leaf.getOutstandingDemandOfRunnableApps().equals(Resources.none())) { + return true; + } + return false; + } else { + for (FSQueue childQueue : queue.getChildQueues()) { + if (hasHigherWeightOutstandingDemand(childQueue, targetWeight)) { + return true; + } + } + return false; + } + } + /** * Tracks the given new runnable app for purposes of maintaining max running * app limits.