diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 462e02add1..b58b493cb9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -176,6 +176,24 @@ public static Resource subtract(Resource lhs, Resource rhs) {
return subtractFrom(clone(lhs), rhs);
}
+ /**
+ * Subtract rhs from lhs and reset any negative
+ * values to zero.
+ * @param lhs {@link Resource} to subtract from
+ * @param rhs {@link Resource} to subtract
+ * @return the value of lhs after subtraction
+ */
+ public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) {
+ subtractFrom(lhs, rhs);
+ if (lhs.getMemorySize() < 0) {
+ lhs.setMemorySize(0);
+ }
+ if (lhs.getVirtualCores() < 0) {
+ lhs.setVirtualCores(0);
+ }
+ return lhs;
+ }
+
public static Resource negate(Resource resource) {
return subtract(NONE, resource);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 64427b762e..ce6d2a2fc5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -127,6 +127,7 @@
*/
protected ConcurrentMap> applications;
protected int nmExpireInterval;
+ protected long nmHeartbeatInterval;
protected final static List EMPTY_CONTAINER_LIST =
new ArrayList();
@@ -163,6 +164,9 @@ public void serviceInit(Configuration conf) throws Exception {
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ nmHeartbeatInterval =
+ conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 9e57fa7529..9e90d661dc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.text.DecimalFormat;
@@ -58,6 +59,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -87,6 +89,7 @@
private final Set containersToPreempt = new HashSet<>();
private Resource fairshareStarvation = Resources.none();
private long lastTimeAtFairShare;
+ private long nextStarvationCheck;
// minShareStarvation attributed to this application by the leaf queue
private Resource minshareStarvation = Resources.none();
@@ -206,15 +209,9 @@ private void subtractResourcesOnBlacklistedNodes(
blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
}
for (FSSchedulerNode node: blacklistNodeIds) {
- Resources.subtractFrom(availableResources,
+ Resources.subtractFromNonNegative(availableResources,
node.getUnallocatedResource());
}
- if (availableResources.getMemorySize() < 0) {
- availableResources.setMemorySize(0);
- }
- if (availableResources.getVirtualCores() < 0) {
- availableResources.setVirtualCores(0);
- }
}
/**
@@ -525,6 +522,15 @@ Resource getStarvation() {
}
/**
+ * Get last computed fairshare starvation.
+ *
+ * @return last computed fairshare starvation
+ */
+ Resource getFairshareStarvation() {
+ return fairshareStarvation;
+ }
+
+ /**
* Set the minshare attributed to this application. To be called only from
* {@link FSLeafQueue#updateStarvedApps}.
*
@@ -1072,17 +1078,16 @@ boolean assignReservedContainer(FSSchedulerNode node) {
}
/**
- * Helper method that computes the extent of fairshare fairshareStarvation.
+ * Helper method that computes the extent of fairshare starvation.
*/
Resource fairShareStarvation() {
Resource threshold = Resources.multiply(
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
- Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
+ Resource starvation = Resources.componentwiseMin(threshold, demand);
+ Resources.subtractFromNonNegative(starvation, getResourceUsage());
long now = scheduler.getClock().getTime();
- boolean starved = Resources.greaterThan(
- fsQueue.getPolicy().getResourceCalculator(),
- scheduler.getClusterResource(), starvation, Resources.none());
+ boolean starved = !Resources.isNone(starvation);
if (!starved) {
lastTimeAtFairShare = now;
@@ -1106,6 +1111,97 @@ boolean isStarvedForFairShare() {
return !Resources.isNone(fairshareStarvation);
}
+ /**
+ * Helper method for {@link #getStarvedResourceRequests()}:
+ * Given a map of visited {@link ResourceRequest}s, it checks if
+ * {@link ResourceRequest} 'rr' has already been visited. The map is updated
+ * to reflect visiting 'rr'.
+ */
+ private static boolean checkAndMarkRRVisited(
+ Map> visitedRRs, ResourceRequest rr) {
+ Priority priority = rr.getPriority();
+ Resource capability = rr.getCapability();
+ if (visitedRRs.containsKey(priority)) {
+ List rrList = visitedRRs.get(priority);
+ if (rrList.contains(capability)) {
+ return true;
+ } else {
+ rrList.add(capability);
+ return false;
+ }
+ } else {
+ List newRRList = new ArrayList<>();
+ newRRList.add(capability);
+ visitedRRs.put(priority, newRRList);
+ return false;
+ }
+ }
+
+ /**
+ * Fetch a list of RRs corresponding to the extent the app is starved
+ * (fairshare and minshare). This method considers the number of containers
+ * in a RR and also only one locality-level (the first encountered
+ * resourceName).
+ *
+ * @return list of {@link ResourceRequest}s corresponding to the amount of
+ * starvation.
+ */
+ List getStarvedResourceRequests() {
+ List ret = new ArrayList<>();
+ Map> visitedRRs= new HashMap<>();
+
+ Resource pending = getStarvation();
+ for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
+ if (Resources.isNone(pending)) {
+ break;
+ }
+ if (checkAndMarkRRVisited(visitedRRs, rr)) {
+ continue;
+ }
+
+ // Compute the number of containers of this capability that fit in the
+ // pending amount
+ int ratio = (int) Math.floor(
+ Resources.ratio(scheduler.getResourceCalculator(),
+ pending, rr.getCapability()));
+ if (ratio == 0) {
+ continue;
+ }
+
+ // If the RR is only partially being satisfied, include only the
+ // partial number of containers.
+ if (ratio < rr.getNumContainers()) {
+ rr = ResourceRequest.newInstance(
+ rr.getPriority(), rr.getResourceName(), rr.getCapability(), ratio);
+ }
+ ret.add(rr);
+ Resources.subtractFromNonNegative(pending,
+ Resources.multiply(rr.getCapability(), ratio));
+ }
+
+ return ret;
+ }
+
+ /**
+ * Notify this app that preemption has been triggered to make room for
+ * outstanding demand. The app should not be considered starved until after
+ * the specified delay.
+ *
+ * @param delayBeforeNextStarvationCheck duration to wait
+ */
+ void preemptionTriggered(long delayBeforeNextStarvationCheck) {
+ nextStarvationCheck =
+ scheduler.getClock().getTime() + delayBeforeNextStarvationCheck;
+ }
+
+ /**
+ * Whether this app's starvation should be considered.
+ */
+ boolean shouldCheckForStarvation() {
+ long now = scheduler.getClock().getTime();
+ return now > nextStarvationCheck;
+ }
+
/* Schedulable methods implementation */
@Override
@@ -1118,6 +1214,13 @@ public Resource getDemand() {
return demand;
}
+ /**
+ * Get the current app's unsatisfied demand.
+ */
+ Resource getPendingDemand() {
+ return Resources.subtract(demand, getResourceUsage());
+ }
+
@Override
public long getStartTime() {
return startTime;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 16070e0384..da88c5400f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -220,6 +220,62 @@ public void updateInternal(boolean checkStarvation) {
}
/**
+ * Compute the extent of fairshare starvation for a set of apps.
+ *
+ * @param appsWithDemand apps to compute fairshare starvation for
+ * @return aggregate fairshare starvation for all apps
+ */
+ private Resource updateStarvedAppsFairshare(
+ TreeSet appsWithDemand) {
+ Resource fairShareStarvation = Resources.clone(none());
+ // Fetch apps with unmet demand sorted by fairshare starvation
+ for (FSAppAttempt app : appsWithDemand) {
+ Resource appStarvation = app.fairShareStarvation();
+ if (!Resources.isNone(appStarvation)) {
+ context.getStarvedApps().addStarvedApp(app);
+ Resources.addTo(fairShareStarvation, appStarvation);
+ } else {
+ break;
+ }
+ }
+ return fairShareStarvation;
+ }
+
+ /**
+ * Distribute minshare starvation to a set of apps
+ * @param appsWithDemand set of apps
+ * @param minShareStarvation minshare starvation to distribute
+ */
+ private void updateStarvedAppsMinshare(
+ TreeSet appsWithDemand, Resource minShareStarvation) {
+ // Keep adding apps to the starved list until the unmet demand goes over
+ // the remaining minshare
+ for (FSAppAttempt app : appsWithDemand) {
+ if (!Resources.isNone(minShareStarvation())) {
+ Resource appMinShare = app.getPendingDemand();
+ Resources.subtractFromNonNegative(
+ appMinShare, app.getFairshareStarvation());
+
+ if (Resources.greaterThan(policy.getResourceCalculator(),
+ scheduler.getClusterResource(),
+ appMinShare, minShareStarvation)) {
+ Resources.subtractFromNonNegative(
+ appMinShare, minShareStarvation);
+ minShareStarvation = none();
+ } else {
+ Resources.subtractFrom(minShareStarvation, appMinShare);
+ }
+ app.setMinshareStarvation(appMinShare);
+ context.getStarvedApps().addStarvedApp(app);
+ } else {
+ // Reset minshare starvation in case we had set it in a previous
+ // iteration
+ app.resetMinshareStarvation();
+ }
+ }
+ }
+
+ /**
* Helper method to identify starved applications. This needs to be called
* ONLY from {@link #updateInternal}, after the application shares
* are updated.
@@ -237,44 +293,19 @@ public void updateInternal(boolean checkStarvation) {
* starved due to fairshare, there might still be starved applications.
*/
private void updateStarvedApps() {
- // First identify starved applications and track total amount of
- // starvation (in resources)
- Resource fairShareStarvation = Resources.clone(none());
+ // Fetch apps with pending demand
+ TreeSet appsWithDemand = fetchAppsWithDemand(false);
- // Fetch apps with unmet demand sorted by fairshare starvation
- TreeSet appsWithDemand = fetchAppsWithDemand();
- for (FSAppAttempt app : appsWithDemand) {
- Resource appStarvation = app.fairShareStarvation();
- if (!Resources.equals(Resources.none(), appStarvation)) {
- context.getStarvedApps().addStarvedApp(app);
- Resources.addTo(fairShareStarvation, appStarvation);
- } else {
- break;
- }
- }
+ // Process apps with fairshare starvation
+ Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand);
// Compute extent of minshare starvation
Resource minShareStarvation = minShareStarvation();
// Compute minshare starvation that is not subsumed by fairshare starvation
- Resources.subtractFrom(minShareStarvation, fairShareStarvation);
+ Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation);
- // Keep adding apps to the starved list until the unmet demand goes over
- // the remaining minshare
- for (FSAppAttempt app : appsWithDemand) {
- if (Resources.greaterThan(policy.getResourceCalculator(),
- scheduler.getClusterResource(), minShareStarvation, none())) {
- Resource appPendingDemand =
- Resources.subtract(app.getDemand(), app.getResourceUsage());
- Resources.subtractFrom(minShareStarvation, appPendingDemand);
- app.setMinshareStarvation(appPendingDemand);
- context.getStarvedApps().addStarvedApp(app);
- } else {
- // Reset minshare starvation in case we had set it in a previous
- // iteration
- app.resetMinshareStarvation();
- }
- }
+ updateStarvedAppsMinshare(appsWithDemand, minShareStarvation);
}
@Override
@@ -352,7 +383,7 @@ public Resource assignContainer(FSSchedulerNode node) {
return assigned;
}
- for (FSAppAttempt sched : fetchAppsWithDemand()) {
+ for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
continue;
}
@@ -368,14 +399,24 @@ public Resource assignContainer(FSSchedulerNode node) {
return assigned;
}
- private TreeSet fetchAppsWithDemand() {
+ /**
+ * Fetch the subset of apps that have unmet demand. When used for
+ * preemption-related code (as opposed to allocation), omits apps that
+ * should not be checked for starvation.
+ *
+ * @param assignment whether the apps are for allocation containers, as
+ * opposed to preemption calculations
+ * @return Set of apps with unmet demand
+ */
+ private TreeSet fetchAppsWithDemand(boolean assignment) {
TreeSet pendingForResourceApps =
new TreeSet<>(policy.getComparator());
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
Resource pending = app.getAppAttemptResourceUsage().getPending();
- if (!pending.equals(none())) {
+ if (!Resources.isNone(pending) &&
+ (assignment || app.shouldCheckForStarvation())) {
pendingForResourceApps.add(app);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index f166878d99..af73c10f79 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -26,8 +26,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
@@ -43,20 +41,26 @@
protected final FSContext context;
private final FairScheduler scheduler;
private final long warnTimeBeforeKill;
+ private final long delayBeforeNextStarvationCheck;
private final Timer preemptionTimer;
FSPreemptionThread(FairScheduler scheduler) {
+ setDaemon(true);
+ setName("FSPreemptionThread");
this.scheduler = scheduler;
this.context = scheduler.getContext();
FairSchedulerConfiguration fsConf = scheduler.getConf();
context.setPreemptionEnabled();
context.setPreemptionUtilizationThreshold(
fsConf.getPreemptionUtilizationThreshold());
- warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
preemptionTimer = new Timer("Preemption Timer", true);
- setDaemon(true);
- setName("FSPreemptionThread");
+ warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
+ long allocDelay = (fsConf.isContinuousSchedulingEnabled()
+ ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs
+ : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
+ delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
+ fsConf.getWaitTimeBeforeNextStarvationCheck();
}
public void run() {
@@ -64,13 +68,8 @@ public void run() {
FSAppAttempt starvedApp;
try{
starvedApp = context.getStarvedApps().take();
- if (!Resources.isNone(starvedApp.getStarvation())) {
- PreemptableContainers containers =
- identifyContainersToPreempt(starvedApp);
- if (containers != null) {
- preemptContainers(containers.containers);
- }
- }
+ preemptContainers(identifyContainersToPreempt(starvedApp));
+ starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
} catch (InterruptedException e) {
LOG.info("Preemption thread interrupted! Exiting.");
return;
@@ -79,58 +78,57 @@ public void run() {
}
/**
- * Given an app, identify containers to preempt to satisfy the app's next
- * resource request.
+ * Given an app, identify containers to preempt to satisfy the app's
+ * starvation.
+ *
+ * Mechanics:
+ * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of
+ * starvation.
+ * 2. For each {@link ResourceRequest}, iterate through matching
+ * nodes and identify containers to preempt all on one node, also
+ * optimizing for least number of AM container preemptions.
*
* @param starvedApp starved application for which we are identifying
* preemption targets
- * @return list of containers to preempt to satisfy starvedApp, null if the
- * app cannot be satisfied by preempting any running containers
+ * @return list of containers to preempt to satisfy starvedApp
*/
- private PreemptableContainers identifyContainersToPreempt(
+ private List identifyContainersToPreempt(
FSAppAttempt starvedApp) {
- PreemptableContainers bestContainers = null;
-
- // Find the nodes that match the next resource request
- SchedulingPlacementSet nextPs =
- starvedApp.getAppSchedulingInfo().getFirstSchedulingPlacementSet();
- PendingAsk firstPendingAsk = nextPs.getPendingAsk(ResourceRequest.ANY);
- // TODO (KK): Should we check other resource requests if we can't match
- // the first one?
-
- Resource requestCapability = firstPendingAsk.getPerAllocationResource();
-
- List potentialNodes =
- scheduler.getNodeTracker().getNodesByResourceName(
- nextPs.getAcceptedResouceNames().next().toString());
+ List containersToPreempt = new ArrayList<>();
+
+ // Iterate through enough RRs to address app's starvation
+ for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
+ for (int i = 0; i < rr.getNumContainers(); i++) {
+ PreemptableContainers bestContainers = null;
+ List potentialNodes = scheduler.getNodeTracker()
+ .getNodesByResourceName(rr.getResourceName());
+ for (FSSchedulerNode node : potentialNodes) {
+ // TODO (YARN-5829): Attempt to reserve the node for starved app.
+ if (isNodeAlreadyReserved(node, starvedApp)) {
+ continue;
+ }
- // From the potential nodes, pick a node that has enough containers
- // from apps over their fairshare
- for (FSSchedulerNode node : potentialNodes) {
- // TODO (YARN-5829): Attempt to reserve the node for starved app. The
- // subsequent if-check needs to be reworked accordingly.
- FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
- if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
- // This node is already reserved by another app. Let us not consider
- // this for preemption.
- continue;
- }
+ int maxAMContainers = bestContainers == null ?
+ Integer.MAX_VALUE : bestContainers.numAMContainers;
+ PreemptableContainers preemptableContainers =
+ identifyContainersToPreemptOnNode(
+ rr.getCapability(), node, maxAMContainers);
+ if (preemptableContainers != null) {
+ // This set is better than any previously identified set.
+ bestContainers = preemptableContainers;
+ if (preemptableContainers.numAMContainers == 0) {
+ break;
+ }
+ }
+ } // End of iteration through nodes for one RR
- int maxAMContainers = bestContainers == null ?
- Integer.MAX_VALUE : bestContainers.numAMContainers;
- PreemptableContainers preemptableContainers =
- identifyContainersToPreemptOnNode(requestCapability, node,
- maxAMContainers);
- if (preemptableContainers != null) {
- if (preemptableContainers.numAMContainers == 0) {
- return preemptableContainers;
- } else {
- bestContainers = preemptableContainers;
+ if (bestContainers != null && bestContainers.containers.size() > 0) {
+ containersToPreempt.addAll(bestContainers.containers);
+ trackPreemptionsAgainstNode(bestContainers.containers);
}
}
- }
-
- return bestContainers;
+ } // End of iteration over RRs
+ return containersToPreempt;
}
/**
@@ -181,23 +179,25 @@ private PreemptableContainers identifyContainersToPreemptOnNode(
return null;
}
- private void preemptContainers(List containers) {
- // Mark the containers as being considered for preemption on the node.
- // Make sure the containers are subsequently removed by calling
- // FSSchedulerNode#removeContainerForPreemption.
- if (containers.size() > 0) {
- FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
- .getNode(containers.get(0).getNodeId());
- node.addContainersForPreemption(containers);
- }
+ private boolean isNodeAlreadyReserved(
+ FSSchedulerNode node, FSAppAttempt app) {
+ FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
+ return nodeReservedApp != null && !nodeReservedApp.equals(app);
+ }
+ private void trackPreemptionsAgainstNode(List containers) {
+ FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
+ .getNode(containers.get(0).getNodeId());
+ node.addContainersForPreemption(containers);
+ }
+
+ private void preemptContainers(List containers) {
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
- FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container " + container +
- " from queue " + queue.getName());
+ " from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 134efff7a0..18806bcbfe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1774,4 +1774,8 @@ protected void decreaseContainer(
public float getReservableNodesRatio() {
return reservableNodesRatio;
}
+
+ long getNMHeartbeatInterval() {
+ return nmHeartbeatInterval;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index b18dd7dee4..231a6d4eda 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -114,12 +114,24 @@
protected static final String PREEMPTION_THRESHOLD =
CONF_PREFIX + "preemption.cluster-utilization-threshold";
protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
-
- protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
- protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+
protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
+ /**
+ * Configurable delay before an app's starvation is considered after it is
+ * identified. This is to give the scheduler enough time to
+ * allocate containers post preemption. This delay is added to the
+ * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats.
+ *
+ * This is intended as a backdoor on production clusters, and hence
+ * intentionally not documented.
+ */
+ protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK =
+ CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
+ protected static final long DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK =
+ 10000;
+
/** Whether to assign multiple containers in one check-in. */
public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
@@ -251,8 +263,9 @@ public String getEventlogDir() {
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
- public int getPreemptionInterval() {
- return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+ public long getWaitTimeBeforeNextStarvationCheck() {
+ return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK,
+ DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK);
}
public int getWaitTimeBeforeKill() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
index 25780cdcd2..079a842577 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
@@ -21,6 +21,8 @@
import java.util.Set;
public class FairSchedulerWithMockPreemption extends FairScheduler {
+ static final long DELAY_FOR_NEXT_STARVATION_CHECK = 10 * 60 * 1000;
+
@Override
protected void createPreemptionThread() {
preemptionThread = new MockPreemptionThread(this);
@@ -30,7 +32,7 @@ protected void createPreemptionThread() {
private Set appsAdded = new HashSet<>();
private int totalAppsAdded = 0;
- MockPreemptionThread(FairScheduler scheduler) {
+ private MockPreemptionThread(FairScheduler scheduler) {
super(scheduler);
}
@@ -41,6 +43,7 @@ public void run() {
FSAppAttempt app = context.getStarvedApps().take();
appsAdded.add(app);
totalAppsAdded++;
+ app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK);
} catch (InterruptedException e) {
return;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
index a5b2d868d4..786a9839c8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import static org.junit.Assert.assertEquals;
@@ -43,6 +44,8 @@
private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
+ private final ControlledClock clock = new ControlledClock();
+
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
private static final String[] QUEUES =
@@ -99,11 +102,17 @@ public void testPreemptionEnabled() throws Exception {
+ "minshare and fairshare queues",
3, preemptionThread.uniqueAppsAdded());
- // Verify the apps get added again on a subsequent update
+ // Verify apps are added again only after the set delay for starvation has
+ // passed.
+ clock.tickSec(1);
scheduler.update();
- Thread.yield();
-
+ assertEquals("Apps re-added even before starvation delay passed",
+ preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded());
verifyLeafQueueStarvation();
+
+ clock.tickMsec(
+ FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK);
+ scheduler.update();
assertTrue("Each app is marked as starved exactly once",
preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
}
@@ -141,7 +150,7 @@ private void setupClusterAndSubmitJobs() throws Exception {
sendEnoughNodeUpdatesToAssignFully();
// Sleep to hit the preemption timeouts
- Thread.sleep(10);
+ clock.tickMsec(10);
// Scheduler update to populate starved apps
scheduler.update();
@@ -208,8 +217,9 @@ private void setupStarvedCluster() throws IOException {
ALLOC_FILE.exists());
resourceManager = new MockRM(conf);
- resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+ scheduler.setClock(clock);
+ resourceManager.start();
preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
scheduler.preemptionThread;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 16df1edd3c..a4d69bf221 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -49,6 +50,9 @@
private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
private static final int GB = 1024;
+ // Scheduler clock
+ private final ControlledClock clock = new ControlledClock();
+
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
@@ -60,25 +64,28 @@
// Starving app that is expected to instigate preemption
private FSAppAttempt starvingApp;
- @Parameterized.Parameters
- public static Collection getParameters() {
- return Arrays.asList(new Boolean[][] {
- {true}, {false}});
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection