diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 462e02add1..b58b493cb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -176,6 +176,24 @@ public static Resource subtract(Resource lhs, Resource rhs) { return subtractFrom(clone(lhs), rhs); } + /** + * Subtract rhs from lhs and reset any negative + * values to zero. + * @param lhs {@link Resource} to subtract from + * @param rhs {@link Resource} to subtract + * @return the value of lhs after subtraction + */ + public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) { + subtractFrom(lhs, rhs); + if (lhs.getMemorySize() < 0) { + lhs.setMemorySize(0); + } + if (lhs.getVirtualCores() < 0) { + lhs.setVirtualCores(0); + } + return lhs; + } + public static Resource negate(Resource resource) { return subtract(NONE, resource); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 64427b762e..ce6d2a2fc5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -127,6 +127,7 @@ */ protected ConcurrentMap> applications; protected int nmExpireInterval; + protected long nmHeartbeatInterval; protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -163,6 +164,9 @@ public void serviceInit(Configuration conf) throws Exception { nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + nmHeartbeatInterval = + conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 9e57fa7529..9e90d661dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.text.DecimalFormat; @@ -58,6 +59,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -87,6 +89,7 @@ private final Set containersToPreempt = new HashSet<>(); private Resource fairshareStarvation = Resources.none(); private long lastTimeAtFairShare; + private long nextStarvationCheck; // minShareStarvation attributed to this application by the leaf queue private Resource minshareStarvation = Resources.none(); @@ -206,15 +209,9 @@ private void subtractResourcesOnBlacklistedNodes( blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this)); } for (FSSchedulerNode node: blacklistNodeIds) { - Resources.subtractFrom(availableResources, + Resources.subtractFromNonNegative(availableResources, node.getUnallocatedResource()); } - if (availableResources.getMemorySize() < 0) { - availableResources.setMemorySize(0); - } - if (availableResources.getVirtualCores() < 0) { - availableResources.setVirtualCores(0); - } } /** @@ -525,6 +522,15 @@ Resource getStarvation() { } /** + * Get last computed fairshare starvation. + * + * @return last computed fairshare starvation + */ + Resource getFairshareStarvation() { + return fairshareStarvation; + } + + /** * Set the minshare attributed to this application. To be called only from * {@link FSLeafQueue#updateStarvedApps}. * @@ -1072,17 +1078,16 @@ boolean assignReservedContainer(FSSchedulerNode node) { } /** - * Helper method that computes the extent of fairshare fairshareStarvation. + * Helper method that computes the extent of fairshare starvation. */ Resource fairShareStarvation() { Resource threshold = Resources.multiply( getFairShare(), fsQueue.getFairSharePreemptionThreshold()); - Resource starvation = Resources.subtractFrom(threshold, getResourceUsage()); + Resource starvation = Resources.componentwiseMin(threshold, demand); + Resources.subtractFromNonNegative(starvation, getResourceUsage()); long now = scheduler.getClock().getTime(); - boolean starved = Resources.greaterThan( - fsQueue.getPolicy().getResourceCalculator(), - scheduler.getClusterResource(), starvation, Resources.none()); + boolean starved = !Resources.isNone(starvation); if (!starved) { lastTimeAtFairShare = now; @@ -1106,6 +1111,97 @@ boolean isStarvedForFairShare() { return !Resources.isNone(fairshareStarvation); } + /** + * Helper method for {@link #getStarvedResourceRequests()}: + * Given a map of visited {@link ResourceRequest}s, it checks if + * {@link ResourceRequest} 'rr' has already been visited. The map is updated + * to reflect visiting 'rr'. + */ + private static boolean checkAndMarkRRVisited( + Map> visitedRRs, ResourceRequest rr) { + Priority priority = rr.getPriority(); + Resource capability = rr.getCapability(); + if (visitedRRs.containsKey(priority)) { + List rrList = visitedRRs.get(priority); + if (rrList.contains(capability)) { + return true; + } else { + rrList.add(capability); + return false; + } + } else { + List newRRList = new ArrayList<>(); + newRRList.add(capability); + visitedRRs.put(priority, newRRList); + return false; + } + } + + /** + * Fetch a list of RRs corresponding to the extent the app is starved + * (fairshare and minshare). This method considers the number of containers + * in a RR and also only one locality-level (the first encountered + * resourceName). + * + * @return list of {@link ResourceRequest}s corresponding to the amount of + * starvation. + */ + List getStarvedResourceRequests() { + List ret = new ArrayList<>(); + Map> visitedRRs= new HashMap<>(); + + Resource pending = getStarvation(); + for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { + if (Resources.isNone(pending)) { + break; + } + if (checkAndMarkRRVisited(visitedRRs, rr)) { + continue; + } + + // Compute the number of containers of this capability that fit in the + // pending amount + int ratio = (int) Math.floor( + Resources.ratio(scheduler.getResourceCalculator(), + pending, rr.getCapability())); + if (ratio == 0) { + continue; + } + + // If the RR is only partially being satisfied, include only the + // partial number of containers. + if (ratio < rr.getNumContainers()) { + rr = ResourceRequest.newInstance( + rr.getPriority(), rr.getResourceName(), rr.getCapability(), ratio); + } + ret.add(rr); + Resources.subtractFromNonNegative(pending, + Resources.multiply(rr.getCapability(), ratio)); + } + + return ret; + } + + /** + * Notify this app that preemption has been triggered to make room for + * outstanding demand. The app should not be considered starved until after + * the specified delay. + * + * @param delayBeforeNextStarvationCheck duration to wait + */ + void preemptionTriggered(long delayBeforeNextStarvationCheck) { + nextStarvationCheck = + scheduler.getClock().getTime() + delayBeforeNextStarvationCheck; + } + + /** + * Whether this app's starvation should be considered. + */ + boolean shouldCheckForStarvation() { + long now = scheduler.getClock().getTime(); + return now > nextStarvationCheck; + } + /* Schedulable methods implementation */ @Override @@ -1118,6 +1214,13 @@ public Resource getDemand() { return demand; } + /** + * Get the current app's unsatisfied demand. + */ + Resource getPendingDemand() { + return Resources.subtract(demand, getResourceUsage()); + } + @Override public long getStartTime() { return startTime; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 16070e0384..da88c5400f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -220,6 +220,62 @@ public void updateInternal(boolean checkStarvation) { } /** + * Compute the extent of fairshare starvation for a set of apps. + * + * @param appsWithDemand apps to compute fairshare starvation for + * @return aggregate fairshare starvation for all apps + */ + private Resource updateStarvedAppsFairshare( + TreeSet appsWithDemand) { + Resource fairShareStarvation = Resources.clone(none()); + // Fetch apps with unmet demand sorted by fairshare starvation + for (FSAppAttempt app : appsWithDemand) { + Resource appStarvation = app.fairShareStarvation(); + if (!Resources.isNone(appStarvation)) { + context.getStarvedApps().addStarvedApp(app); + Resources.addTo(fairShareStarvation, appStarvation); + } else { + break; + } + } + return fairShareStarvation; + } + + /** + * Distribute minshare starvation to a set of apps + * @param appsWithDemand set of apps + * @param minShareStarvation minshare starvation to distribute + */ + private void updateStarvedAppsMinshare( + TreeSet appsWithDemand, Resource minShareStarvation) { + // Keep adding apps to the starved list until the unmet demand goes over + // the remaining minshare + for (FSAppAttempt app : appsWithDemand) { + if (!Resources.isNone(minShareStarvation())) { + Resource appMinShare = app.getPendingDemand(); + Resources.subtractFromNonNegative( + appMinShare, app.getFairshareStarvation()); + + if (Resources.greaterThan(policy.getResourceCalculator(), + scheduler.getClusterResource(), + appMinShare, minShareStarvation)) { + Resources.subtractFromNonNegative( + appMinShare, minShareStarvation); + minShareStarvation = none(); + } else { + Resources.subtractFrom(minShareStarvation, appMinShare); + } + app.setMinshareStarvation(appMinShare); + context.getStarvedApps().addStarvedApp(app); + } else { + // Reset minshare starvation in case we had set it in a previous + // iteration + app.resetMinshareStarvation(); + } + } + } + + /** * Helper method to identify starved applications. This needs to be called * ONLY from {@link #updateInternal}, after the application shares * are updated. @@ -237,44 +293,19 @@ public void updateInternal(boolean checkStarvation) { * starved due to fairshare, there might still be starved applications. */ private void updateStarvedApps() { - // First identify starved applications and track total amount of - // starvation (in resources) - Resource fairShareStarvation = Resources.clone(none()); + // Fetch apps with pending demand + TreeSet appsWithDemand = fetchAppsWithDemand(false); - // Fetch apps with unmet demand sorted by fairshare starvation - TreeSet appsWithDemand = fetchAppsWithDemand(); - for (FSAppAttempt app : appsWithDemand) { - Resource appStarvation = app.fairShareStarvation(); - if (!Resources.equals(Resources.none(), appStarvation)) { - context.getStarvedApps().addStarvedApp(app); - Resources.addTo(fairShareStarvation, appStarvation); - } else { - break; - } - } + // Process apps with fairshare starvation + Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand); // Compute extent of minshare starvation Resource minShareStarvation = minShareStarvation(); // Compute minshare starvation that is not subsumed by fairshare starvation - Resources.subtractFrom(minShareStarvation, fairShareStarvation); + Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation); - // Keep adding apps to the starved list until the unmet demand goes over - // the remaining minshare - for (FSAppAttempt app : appsWithDemand) { - if (Resources.greaterThan(policy.getResourceCalculator(), - scheduler.getClusterResource(), minShareStarvation, none())) { - Resource appPendingDemand = - Resources.subtract(app.getDemand(), app.getResourceUsage()); - Resources.subtractFrom(minShareStarvation, appPendingDemand); - app.setMinshareStarvation(appPendingDemand); - context.getStarvedApps().addStarvedApp(app); - } else { - // Reset minshare starvation in case we had set it in a previous - // iteration - app.resetMinshareStarvation(); - } - } + updateStarvedAppsMinshare(appsWithDemand, minShareStarvation); } @Override @@ -352,7 +383,7 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - for (FSAppAttempt sched : fetchAppsWithDemand()) { + for (FSAppAttempt sched : fetchAppsWithDemand(true)) { if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } @@ -368,14 +399,24 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - private TreeSet fetchAppsWithDemand() { + /** + * Fetch the subset of apps that have unmet demand. When used for + * preemption-related code (as opposed to allocation), omits apps that + * should not be checked for starvation. + * + * @param assignment whether the apps are for allocation containers, as + * opposed to preemption calculations + * @return Set of apps with unmet demand + */ + private TreeSet fetchAppsWithDemand(boolean assignment) { TreeSet pendingForResourceApps = new TreeSet<>(policy.getComparator()); readLock.lock(); try { for (FSAppAttempt app : runnableApps) { Resource pending = app.getAppAttemptResourceUsage().getPending(); - if (!pending.equals(none())) { + if (!Resources.isNone(pending) && + (assignment || app.shouldCheckForStarvation())) { pendingForResourceApps.add(app); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index f166878d99..af73c10f79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; @@ -43,20 +41,26 @@ protected final FSContext context; private final FairScheduler scheduler; private final long warnTimeBeforeKill; + private final long delayBeforeNextStarvationCheck; private final Timer preemptionTimer; FSPreemptionThread(FairScheduler scheduler) { + setDaemon(true); + setName("FSPreemptionThread"); this.scheduler = scheduler; this.context = scheduler.getContext(); FairSchedulerConfiguration fsConf = scheduler.getConf(); context.setPreemptionEnabled(); context.setPreemptionUtilizationThreshold( fsConf.getPreemptionUtilizationThreshold()); - warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); preemptionTimer = new Timer("Preemption Timer", true); - setDaemon(true); - setName("FSPreemptionThread"); + warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); + long allocDelay = (fsConf.isContinuousSchedulingEnabled() + ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs + : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats + delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay + + fsConf.getWaitTimeBeforeNextStarvationCheck(); } public void run() { @@ -64,13 +68,8 @@ public void run() { FSAppAttempt starvedApp; try{ starvedApp = context.getStarvedApps().take(); - if (!Resources.isNone(starvedApp.getStarvation())) { - PreemptableContainers containers = - identifyContainersToPreempt(starvedApp); - if (containers != null) { - preemptContainers(containers.containers); - } - } + preemptContainers(identifyContainersToPreempt(starvedApp)); + starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); return; @@ -79,58 +78,57 @@ public void run() { } /** - * Given an app, identify containers to preempt to satisfy the app's next - * resource request. + * Given an app, identify containers to preempt to satisfy the app's + * starvation. + * + * Mechanics: + * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of + * starvation. + * 2. For each {@link ResourceRequest}, iterate through matching + * nodes and identify containers to preempt all on one node, also + * optimizing for least number of AM container preemptions. * * @param starvedApp starved application for which we are identifying * preemption targets - * @return list of containers to preempt to satisfy starvedApp, null if the - * app cannot be satisfied by preempting any running containers + * @return list of containers to preempt to satisfy starvedApp */ - private PreemptableContainers identifyContainersToPreempt( + private List identifyContainersToPreempt( FSAppAttempt starvedApp) { - PreemptableContainers bestContainers = null; - - // Find the nodes that match the next resource request - SchedulingPlacementSet nextPs = - starvedApp.getAppSchedulingInfo().getFirstSchedulingPlacementSet(); - PendingAsk firstPendingAsk = nextPs.getPendingAsk(ResourceRequest.ANY); - // TODO (KK): Should we check other resource requests if we can't match - // the first one? - - Resource requestCapability = firstPendingAsk.getPerAllocationResource(); - - List potentialNodes = - scheduler.getNodeTracker().getNodesByResourceName( - nextPs.getAcceptedResouceNames().next().toString()); + List containersToPreempt = new ArrayList<>(); + + // Iterate through enough RRs to address app's starvation + for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + PreemptableContainers bestContainers = null; + List potentialNodes = scheduler.getNodeTracker() + .getNodesByResourceName(rr.getResourceName()); + for (FSSchedulerNode node : potentialNodes) { + // TODO (YARN-5829): Attempt to reserve the node for starved app. + if (isNodeAlreadyReserved(node, starvedApp)) { + continue; + } - // From the potential nodes, pick a node that has enough containers - // from apps over their fairshare - for (FSSchedulerNode node : potentialNodes) { - // TODO (YARN-5829): Attempt to reserve the node for starved app. The - // subsequent if-check needs to be reworked accordingly. - FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); - if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) { - // This node is already reserved by another app. Let us not consider - // this for preemption. - continue; - } + int maxAMContainers = bestContainers == null ? + Integer.MAX_VALUE : bestContainers.numAMContainers; + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode( + rr.getCapability(), node, maxAMContainers); + if (preemptableContainers != null) { + // This set is better than any previously identified set. + bestContainers = preemptableContainers; + if (preemptableContainers.numAMContainers == 0) { + break; + } + } + } // End of iteration through nodes for one RR - int maxAMContainers = bestContainers == null ? - Integer.MAX_VALUE : bestContainers.numAMContainers; - PreemptableContainers preemptableContainers = - identifyContainersToPreemptOnNode(requestCapability, node, - maxAMContainers); - if (preemptableContainers != null) { - if (preemptableContainers.numAMContainers == 0) { - return preemptableContainers; - } else { - bestContainers = preemptableContainers; + if (bestContainers != null && bestContainers.containers.size() > 0) { + containersToPreempt.addAll(bestContainers.containers); + trackPreemptionsAgainstNode(bestContainers.containers); } } - } - - return bestContainers; + } // End of iteration over RRs + return containersToPreempt; } /** @@ -181,23 +179,25 @@ private PreemptableContainers identifyContainersToPreemptOnNode( return null; } - private void preemptContainers(List containers) { - // Mark the containers as being considered for preemption on the node. - // Make sure the containers are subsequently removed by calling - // FSSchedulerNode#removeContainerForPreemption. - if (containers.size() > 0) { - FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() - .getNode(containers.get(0).getNodeId()); - node.addContainersForPreemption(containers); - } + private boolean isNodeAlreadyReserved( + FSSchedulerNode node, FSAppAttempt app) { + FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); + return nodeReservedApp != null && !nodeReservedApp.equals(app); + } + private void trackPreemptionsAgainstNode(List containers) { + FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() + .getNode(containers.get(0).getNodeId()); + node.addContainersForPreemption(containers); + } + + private void preemptContainers(List containers) { // Warn application about containers to be killed for (RMContainer container : containers) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container " + container + - " from queue " + queue.getName()); + " from queue " + app.getQueueName()); app.trackContainerForPreemption(container); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 134efff7a0..18806bcbfe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1774,4 +1774,8 @@ protected void decreaseContainer( public float getReservableNodesRatio() { return reservableNodesRatio; } + + long getNMHeartbeatInterval() { + return nmHeartbeatInterval; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index b18dd7dee4..231a6d4eda 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -114,12 +114,24 @@ protected static final String PREEMPTION_THRESHOLD = CONF_PREFIX + "preemption.cluster-utilization-threshold"; protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f; - - protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval"; - protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000; + protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill"; protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000; + /** + * Configurable delay before an app's starvation is considered after it is + * identified. This is to give the scheduler enough time to + * allocate containers post preemption. This delay is added to the + * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats. + * + * This is intended as a backdoor on production clusters, and hence + * intentionally not documented. + */ + protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK = + CONF_PREFIX + "waitTimeBeforeNextStarvationCheck"; + protected static final long DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK = + 10000; + /** Whether to assign multiple containers in one check-in. */ public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false; @@ -251,8 +263,9 @@ public String getEventlogDir() { "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); } - public int getPreemptionInterval() { - return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL); + public long getWaitTimeBeforeNextStarvationCheck() { + return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK, + DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK); } public int getWaitTimeBeforeKill() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java index 25780cdcd2..079a842577 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java @@ -21,6 +21,8 @@ import java.util.Set; public class FairSchedulerWithMockPreemption extends FairScheduler { + static final long DELAY_FOR_NEXT_STARVATION_CHECK = 10 * 60 * 1000; + @Override protected void createPreemptionThread() { preemptionThread = new MockPreemptionThread(this); @@ -30,7 +32,7 @@ protected void createPreemptionThread() { private Set appsAdded = new HashSet<>(); private int totalAppsAdded = 0; - MockPreemptionThread(FairScheduler scheduler) { + private MockPreemptionThread(FairScheduler scheduler) { super(scheduler); } @@ -41,6 +43,7 @@ public void run() { FSAppAttempt app = context.getStarvedApps().take(); appsAdded.add(app); totalAppsAdded++; + app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK); } catch (InterruptedException e) { return; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index a5b2d868d4..786a9839c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import static org.junit.Assert.assertEquals; @@ -43,6 +44,8 @@ private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES"); + private final ControlledClock clock = new ControlledClock(); + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; private static final String[] QUEUES = @@ -99,11 +102,17 @@ public void testPreemptionEnabled() throws Exception { + "minshare and fairshare queues", 3, preemptionThread.uniqueAppsAdded()); - // Verify the apps get added again on a subsequent update + // Verify apps are added again only after the set delay for starvation has + // passed. + clock.tickSec(1); scheduler.update(); - Thread.yield(); - + assertEquals("Apps re-added even before starvation delay passed", + preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded()); verifyLeafQueueStarvation(); + + clock.tickMsec( + FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK); + scheduler.update(); assertTrue("Each app is marked as starved exactly once", preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); } @@ -141,7 +150,7 @@ private void setupClusterAndSubmitJobs() throws Exception { sendEnoughNodeUpdatesToAssignFully(); // Sleep to hit the preemption timeouts - Thread.sleep(10); + clock.tickMsec(10); // Scheduler update to populate starved apps scheduler.update(); @@ -208,8 +217,9 @@ private void setupStarvedCluster() throws IOException { ALLOC_FILE.exists()); resourceManager = new MockRM(conf); - resourceManager.start(); scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setClock(clock); + resourceManager.start(); preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread) scheduler.preemptionThread; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 16df1edd3c..a4d69bf221 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -49,6 +50,9 @@ private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); private static final int GB = 1024; + // Scheduler clock + private final ControlledClock clock = new ControlledClock(); + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; @@ -60,25 +64,28 @@ // Starving app that is expected to instigate preemption private FSAppAttempt starvingApp; - @Parameterized.Parameters - public static Collection getParameters() { - return Arrays.asList(new Boolean[][] { - {true}, {false}}); + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][] { + {"FairSharePreemption", true}, + {"MinSharePreemption", false}}); } - public TestFairSchedulerPreemption(Boolean fairshare) throws IOException { + public TestFairSchedulerPreemption(String name, boolean fairshare) + throws IOException { fairsharePreemption = fairshare; writeAllocFile(); } @Before - public void setup() { + public void setup() throws IOException { createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE.getAbsolutePath()); conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + setupCluster(); } @After @@ -166,8 +173,9 @@ private void writeResourceParams(PrintWriter out) { private void setupCluster() throws IOException { resourceManager = new MockRM(conf); - resourceManager.start(); scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setClock(clock); + resourceManager.start(); // Create and add two nodes to the cluster addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); @@ -197,7 +205,7 @@ private void sendEnoughNodeUpdatesToAssignFully() { * * @param queueName queue name */ - private void takeAllResource(String queueName) { + private void takeAllResources(String queueName) { // Create an app that takes up all the resources on the cluster ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, 1, queueName, "default", @@ -227,8 +235,8 @@ private void preemptHalfResources(String queueName) NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); starvingApp = scheduler.getSchedulerApp(appAttemptId); - // Sleep long enough to pass - Thread.sleep(10); + // Move clock enough to identify starvation + clock.tickSec(1); scheduler.update(); } @@ -243,14 +251,13 @@ private void preemptHalfResources(String queueName) */ private void submitApps(String queue1, String queue2) throws InterruptedException { - takeAllResource(queue1); + takeAllResources(queue1); preemptHalfResources(queue2); } private void verifyPreemption() throws InterruptedException { - // Sleep long enough for four containers to be preempted. Note that the - // starved app must be queued four times for containers to be preempted. - for (int i = 0; i < 10000; i++) { + // Sleep long enough for four containers to be preempted. + for (int i = 0; i < 100; i++) { if (greedyApp.getLiveContainers().size() == 4) { break; } @@ -268,7 +275,7 @@ private void verifyPreemption() throws InterruptedException { private void verifyNoPreemption() throws InterruptedException { // Sleep long enough to ensure not even one container is preempted. - for (int i = 0; i < 600; i++) { + for (int i = 0; i < 100; i++) { if (greedyApp.getLiveContainers().size() != 8) { break; } @@ -279,7 +286,6 @@ private void verifyNoPreemption() throws InterruptedException { @Test public void testPreemptionWithinSameLeafQueue() throws Exception { - setupCluster(); String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { @@ -291,21 +297,18 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { - setupCluster(); submitApps("root.preemptable.child-1", "root.preemptable.child-2"); verifyPreemption(); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { - setupCluster(); submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); verifyPreemption(); } @Test public void testNoPreemptionFromDisallowedQueue() throws Exception { - setupCluster(); submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); verifyNoPreemption(); } @@ -331,9 +334,7 @@ private void setNumAMContainersPerNode(int numAMContainersPerNode) { @Test public void testPreemptionSelectNonAMContainer() throws Exception { - setupCluster(); - - takeAllResource("root.preemptable.child-1"); + takeAllResources("root.preemptable.child-1"); setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2");