diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
index a5aee74..1f4e7c7 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
@@ -75,7 +75,7 @@ private long getMemorySize(Schedulable schedulable, Metric metric) {
case DEMAND:
return schedulable.getDemand().getMemorySize();
case USAGE:
- return schedulable.getResourceUsage().getMemorySize();
+ return schedulable.getGuaranteedResourceUsage().getMemorySize();
case MINSHARE:
return schedulable.getMinShare().getMemorySize();
case MAXSHARE:
@@ -96,7 +96,7 @@ private int getVirtualCores(Schedulable schedulable, Metric metric) {
case DEMAND:
return schedulable.getDemand().getVirtualCores();
case USAGE:
- return schedulable.getResourceUsage().getVirtualCores();
+ return schedulable.getGuaranteedResourceUsage().getVirtualCores();
case MINSHARE:
return schedulable.getMinShare().getVirtualCores();
case MAXSHARE:
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c8ab62a..6352d3d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -266,6 +266,11 @@ private static void addDeprecatedKeys() {
/** UserGroupMappingPlacementRule configuration string. */
public static final String USER_GROUP_PLACEMENT_RULE = "user-group";
+ public static final String RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED =
+ RM_PREFIX + "scheduler.oversubscription.enabled";
+ public static final boolean DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED
+ = true;
+
/** Enable Resource Manager webapp ui actions */
public static final String RM_WEBAPP_UI_ACTIONS_ENABLED =
RM_PREFIX + "webapp.ui-actions.enabled";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9e97ddf..ad1b32f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -381,6 +381,15 @@
+
+ If set to true, the scheduler will try to over-allocate resources on
+ the nodes that allow overallocation.
+
+ yarn.scheduler.overallocation.enabled
+ false
+
+
+
Enable RM to recover state after starting. If true, then
yarn.resourcemanager.store.class must be specified.
yarn.resourcemanager.recovery.enabled
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 2276d2c..4b4992a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -38,6 +38,8 @@
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -58,6 +60,10 @@
private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
private Resource capacity;
+ // The resource available within the node's capacity that can be given out
+ // to run GUARANTEED containers, including reserved, preempted and any
+ // remaining free resources. Resources allocated to OPPORTUNISTIC containers
+ // are tracked in allocatedResourceOpportunistic
private Resource unallocatedResource = Resource.newInstance(0, 0);
private RMContainer reservedContainer;
@@ -570,6 +576,48 @@ public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
+ /**
+ * Get the amount of resources that can be allocated to opportunistic
+ * containers in the case of overallocation. It is calculated as
+ * node capacity - (node utilization + resources of allocated-yet-not-started
+ * containers).
+ * @return the amount of resources that are available to be allocated to
+ * opportunistic containers
+ */
+ public synchronized Resource allowedResourceForOverAllocation() {
+ OverAllocationInfo overAllocationInfo = rmNode.getOverAllocationInfo();
+ if (overAllocationInfo == null) {
+ LOG.debug("Overallocation is disabled on node: " + rmNode.getHostName());
+ return Resources.none();
+ }
+
+ ResourceUtilization projectedNodeUtilization = ResourceUtilization.
+ newInstance(getNodeUtilization());
+ // account for resources allocated in this heartbeat
+ projectedNodeUtilization.addTo(
+ (int) (resourceAllocatedPendingLaunch.getMemorySize()), 0,
+ (float) resourceAllocatedPendingLaunch.getVirtualCores() /
+ capacity.getVirtualCores());
+
+ ResourceThresholds thresholds =
+ overAllocationInfo.getOverAllocationThresholds();
+ Resource overAllocationThreshold = Resources.createResource(
+ (long) (capacity.getMemorySize() * thresholds.getMemoryThreshold()),
+ (int) (capacity.getVirtualCores() * thresholds.getCpuThreshold()));
+ long allowedMemory = Math.max(0, overAllocationThreshold.getMemorySize()
+ - projectedNodeUtilization.getPhysicalMemory());
+ int allowedCpu = Math.max(0, (int)
+ (overAllocationThreshold.getVirtualCores() -
+ projectedNodeUtilization.getCPU() * capacity.getVirtualCores()));
+
+ Resource resourceAllowedForOpportunisticContainers =
+ Resources.createResource(allowedMemory, allowedCpu);
+
+ // TODO cap the resources allocated to OPPORTUNISTIC containers on a node
+ // in terms of its capacity. i.e. return min(max_radio * capacity, allowed)
+ return resourceAllowedForOpportunisticContainers;
+}
+
private static class ContainerInfo {
private final RMContainer container;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 3024558..ffd2192 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -175,30 +176,35 @@ void containerCompleted(RMContainer rmContainer,
}
}
- private void unreserveInternal(
+ private boolean unreserveInternal(
SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
try {
writeLock.lock();
Map reservedContainers = this.reservedContainers.get(
schedulerKey);
- RMContainer reservedContainer = reservedContainers.remove(
- node.getNodeID());
- if (reservedContainers.isEmpty()) {
- this.reservedContainers.remove(schedulerKey);
- }
+ boolean unreserved = false;
+ if (reservedContainers != null) {
+ RMContainer reservedContainer = reservedContainers.remove(
+ node.getNodeID());
+ if (reservedContainers.isEmpty()) {
+ this.reservedContainers.remove(schedulerKey);
+ }
- // Reset the re-reservation count
- resetReReservations(schedulerKey);
+ // Reset the re-reservation count
+ resetReReservations(schedulerKey);
- Resource resource = reservedContainer.getContainer().getResource();
- this.attemptResourceUsage.decReserved(resource);
+ Resource resource = reservedContainer.getContainer().getResource();
+ this.attemptResourceUsage.decReserved(resource);
+ unreserved = true;
- LOG.info(
- "Application " + getApplicationId() + " unreserved " + " on node "
- + node + ", currently has " + reservedContainers.size()
- + " at priority " + schedulerKey.getPriority()
- + "; currentReservation " + this.attemptResourceUsage
- .getReserved());
+ LOG.info(
+ "Application " + getApplicationId() + " unreserved " + " on node "
+ + node + ", currently has " + reservedContainers.size()
+ + " at priority " + schedulerKey.getPriority()
+ + "; currentReservation " + this.attemptResourceUsage
+ .getReserved());
+ }
+ return unreserved;
} finally {
writeLock.unlock();
}
@@ -226,7 +232,7 @@ public Resource getHeadroom() {
SchedulingPolicy policy = fsQueue.getPolicy();
Resource queueFairShare = fsQueue.getFairShare();
- Resource queueUsage = fsQueue.getResourceUsage();
+ Resource queueUsage = fsQueue.getGuaranteedResourceUsage();
Resource clusterResource = this.scheduler.getClusterResource();
Resource clusterUsage = this.scheduler.getRootQueueMetrics()
.getAllocatedResources();
@@ -417,7 +423,7 @@ NodeType getAllowedLocalityLevelByTime(
public RMContainer allocate(NodeType type, FSSchedulerNode node,
SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
- Container reservedContainer) {
+ Container reservedContainer, boolean opportunistic) {
RMContainer rmContainer;
Container container;
@@ -442,9 +448,11 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node,
}
container = reservedContainer;
+ ExecutionType executionType = opportunistic ?
+ ExecutionType.OPPORTUNISTIC : ExecutionType.GUARANTEED;
if (container == null) {
container = createContainer(node, pendingAsk.getPerAllocationResource(),
- schedulerKey);
+ schedulerKey, executionType);
}
// Create RMContainer
@@ -460,7 +468,11 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node,
// Update consumption and track allocations
List resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
- this.attemptResourceUsage.incUsed(container.getResource());
+ if (executionType.equals(ExecutionType.GUARANTEED)) {
+ this.attemptResourceUsage.incUsed(container.getResource());
+ } else {
+ this.attemptOpportunisticResourceUsage.incUsed(container.getResource());
+ }
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
@@ -607,7 +619,7 @@ boolean canContainerBePreempted(RMContainer container) {
// Check if the app's allocation will be over its fairshare even
// after preempting this container
- Resource usageAfterPreemption = Resources.clone(getResourceUsage());
+ Resource usageAfterPreemption = Resources.clone(getGuaranteedResourceUsage());
// Subtract resources of containers already queued for preemption
synchronized (preemptionVariablesLock) {
@@ -631,7 +643,7 @@ boolean canContainerBePreempted(RMContainer container) {
* @return Container
*/
private Container createContainer(FSSchedulerNode node, Resource capability,
- SchedulerRequestKey schedulerKey) {
+ SchedulerRequestKey schedulerKey, ExecutionType executionType) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(
@@ -641,7 +653,7 @@ private Container createContainer(FSSchedulerNode node, Resource capability,
return BuilderUtils.newContainer(containerId, nodeId,
node.getRMNode().getHttpAddress(), capability,
schedulerKey.getPriority(), null,
- schedulerKey.getAllocationRequestId());
+ executionType, schedulerKey.getAllocationRequestId());
}
/**
@@ -665,7 +677,7 @@ private boolean reserve(Resource perAllocationResource, FSSchedulerNode node,
if (reservedContainer == null) {
reservedContainer =
createContainer(node, perAllocationResource,
- schedulerKey);
+ schedulerKey, ExecutionType.GUARANTEED);
getMetrics().reserveResource(node.getPartition(), getUser(),
reservedContainer.getResource());
RMContainer rmContainer =
@@ -722,11 +734,12 @@ private boolean reservationExceedsThreshold(FSSchedulerNode node,
public void unreserve(SchedulerRequestKey schedulerKey,
FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
- unreserveInternal(schedulerKey, node);
- node.unreserveResource(this);
- clearReservation(node);
- getMetrics().unreserveResource(node.getPartition(),
- getUser(), rmContainer.getContainer().getResource());
+ if (unreserveInternal(schedulerKey, node)) {
+ node.unreserveResource(this);
+ clearReservation(node);
+ getMetrics().unreserveResource(node.getPartition(),
+ getUser(), rmContainer.getContainer().getResource());
+ }
}
private void setReservation(SchedulerNode node) {
@@ -800,13 +813,15 @@ int getNumReservations(String rackName, boolean isAny) {
*/
private Resource assignContainer(
FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
- boolean reserved, SchedulerRequestKey schedulerKey) {
+ boolean reserved, boolean opportunistic,
+ SchedulerRequestKey schedulerKey) {
// How much does this request need?
Resource capability = pendingAsk.getPerAllocationResource();
// How much does the node have?
- Resource available = node.getUnallocatedResource();
+ Resource available = opportunistic ? node.allowedResourceForOverAllocation() :
+ node.getUnallocatedResource();
Container reservedContainer = null;
if (reserved) {
@@ -818,33 +833,39 @@ private Resource assignContainer(
// Inform the application of the new container for this request
RMContainer allocatedContainer =
allocate(type, node, schedulerKey, pendingAsk,
- reservedContainer);
- if (allocatedContainer == null) {
- // Did the application need this resource?
- if (reserved) {
- unreserve(schedulerKey, node);
- }
- return Resources.none();
- }
+ reservedContainer, opportunistic);
- // If we had previously made a reservation, delete it
+ // delete the previous reservation, if any
if (reserved) {
unreserve(schedulerKey, node);
}
- // Inform the node
- node.allocateContainer(allocatedContainer);
+ if (allocatedContainer != null) {
+ if (opportunistic) {
+ // if an OPPORTUNISTIC container is allocated, we need to
+ // unreserve anything that we may have reserved in our
+ // previous attempt to assign GUARANTEED containers for this
+ // scheduling request.
+ unreserve(schedulerKey, node);
+ }
- // If not running unmanaged, the first container we allocate is always
- // the AM. Set the amResource for this app and update the leaf queue's AM
- // usage
- if (!isAmRunning() && !getUnmanagedAM()) {
- setAMResource(capability);
- getQueue().addAMResourceUsage(capability);
- setAmRunning(true);
- }
- return capability;
+ // Inform the node
+ node.allocateContainer(allocatedContainer);
+
+ // If not running unmanaged, the first container we allocate
+ // is always the AM. Set amResource for this app and update
+ // the leaf queue's AM usage
+ if (!isAmRunning() && !getUnmanagedAM()) {
+ setAMResource(capability);
+ getQueue().addAMResourceUsage(capability);
+ setAmRunning(true);
+ }
+
+ return capability;
+ } else {
+ return Resources.none();
+ }
}
if (LOG.isDebugEnabled()) {
@@ -855,7 +876,7 @@ private Resource assignContainer(
// The desired container won't fit here, so reserve
// Reserve only, if app does not wait for preempted resources on the node,
// otherwise we may end up with duplicate reservations
- if (isReservable(capability) &&
+ if (isReservable(capability) && !opportunistic &&
!node.isPreemptedForApp(this) &&
reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
type, schedulerKey)) {
@@ -900,7 +921,8 @@ private boolean isOverAMShareLimit() {
return false;
}
- private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+ private Resource assignContainer(FSSchedulerNode node, boolean opportunistic,
+ boolean reserved) {
if (LOG.isTraceEnabled()) {
LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
}
@@ -963,7 +985,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
- reserved, schedulerKey);
+ reserved, opportunistic, schedulerKey);
}
if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
@@ -980,7 +1002,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
- reserved, schedulerKey);
+ reserved, opportunistic, schedulerKey);
}
PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
@@ -1000,7 +1022,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
- reserved, schedulerKey);
+ reserved, opportunistic, schedulerKey);
}
}
@@ -1101,7 +1123,7 @@ boolean assignReservedContainer(FSSchedulerNode node) {
// there's only one container size per priority.
if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
node.getUnallocatedResource())) {
- assignContainer(node, true);
+ assignContainer(node, false, true);
}
return true;
}
@@ -1117,7 +1139,7 @@ Resource fairShareStarvation() {
Resource fairDemand = Resources.componentwiseMin(threshold, demand);
// Check if the queue is starved for fairshare
- boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand);
+ boolean starved = isUsageBelowShare(getGuaranteedResourceUsage(), fairDemand);
if (!starved) {
lastTimeAtFairShare = now;
@@ -1130,7 +1152,7 @@ Resource fairShareStarvation() {
} else {
// The app has been starved for longer than preemption-timeout.
fairshareStarvation =
- Resources.subtractFromNonNegative(fairDemand, getResourceUsage());
+ Resources.subtractFromNonNegative(fairDemand, getGuaranteedResourceUsage());
}
return fairshareStarvation;
}
@@ -1149,7 +1171,7 @@ private boolean isUsageBelowShare(Resource usage, Resource share) {
* @return true if the app is starved for fairshare, false otherwise
*/
boolean isStarvedForFairShare() {
- return isUsageBelowShare(getResourceUsage(), getFairShare());
+ return isUsageBelowShare(getGuaranteedResourceUsage(), getFairShare());
}
/**
@@ -1250,7 +1272,7 @@ public Resource getDemand() {
* Get the current app's unsatisfied demand.
*/
Resource getPendingDemand() {
- return Resources.subtract(demand, getResourceUsage());
+ return Resources.subtract(demand, getGuaranteedResourceUsage());
}
@Override
@@ -1269,11 +1291,16 @@ public Resource getMaxShare() {
}
@Override
- public Resource getResourceUsage() {
+ public Resource getGuaranteedResourceUsage() {
return getCurrentConsumption();
}
@Override
+ public Resource getOpportunisticResourceUsage() {
+ return attemptOpportunisticResourceUsage.getUsed();
+ }
+
+ @Override
public float getWeight() {
return scheduler.getAppWeight(this);
}
@@ -1315,7 +1342,7 @@ public void updateDemand() {
}
@Override
- public Resource assignContainer(FSSchedulerNode node) {
+ public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
if (isOverAMShareLimit()) {
PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
@@ -1327,7 +1354,7 @@ public Resource assignContainer(FSSchedulerNode node) {
}
return Resources.none();
}
- return assignContainer(node, false);
+ return assignContainer(node, opportunistic, false);
}
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 60477d4..45e6b1e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -293,15 +293,32 @@ public Resource getDemand() {
}
@Override
- public Resource getResourceUsage() {
+ public Resource getGuaranteedResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
- Resources.addTo(usage, app.getResourceUsage());
+ Resources.addTo(usage, app.getGuaranteedResourceUsage());
}
for (FSAppAttempt app : nonRunnableApps) {
- Resources.addTo(usage, app.getResourceUsage());
+ Resources.addTo(usage, app.getGuaranteedResourceUsage());
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return usage;
+ }
+
+ @Override
+ public Resource getOpportunisticResourceUsage() {
+ Resource usage = Resource.newInstance(0, 0);
+ readLock.lock();
+ try {
+ for (FSAppAttempt app : runnableApps) {
+ Resources.addTo(usage, app.getOpportunisticResourceUsage());
+ }
+ for (FSAppAttempt app : nonRunnableApps) {
+ Resources.addTo(usage, app.getOpportunisticResourceUsage());
}
} finally {
readLock.unlock();
@@ -342,14 +359,14 @@ public void updateDemand() {
}
@Override
- public Resource assignContainer(FSSchedulerNode node) {
+ public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
Resource assigned = none();
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
getName() + " fairShare: " + getFairShare());
}
- if (!assignContainerPreCheck(node)) {
+ if (!assignContainerPreCheck(node, opportunistic)) {
return assigned;
}
@@ -357,7 +374,7 @@ public Resource assignContainer(FSSchedulerNode node) {
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
continue;
}
- assigned = sched.assignContainer(node);
+ assigned = sched.assignContainer(node, opportunistic);
if (!assigned.equals(none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned container in queue:" + getName() + " " +
@@ -555,7 +572,7 @@ private Resource minShareStarvation() {
Resource desiredShare = Resources.min(policy.getResourceCalculator(),
scheduler.getClusterResource(), getMinShare(), getDemand());
- Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
+ Resource starvation = Resources.subtract(desiredShare, getGuaranteedResourceUsage());
boolean starved = !Resources.isNone(starvation);
long now = scheduler.getClock().getTime();
@@ -613,7 +630,7 @@ protected void dumpStateInternal(StringBuilder sb) {
", SteadyFairShare: " + getSteadyFairShare() +
", MaxShare: " + getMaxShare() +
", MinShare: " + minShare +
- ", ResourceUsage: " + getResourceUsage() +
+ ", ResourceUsage: " + getGuaranteedResourceUsage() +
", Demand: " + getDemand() +
", Runnable: " + getNumRunnableApps() +
", NumPendingApps: " + getNumPendingApps() +
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index e42c9f5..dabcf59 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -119,12 +119,26 @@ public Resource getDemand() {
}
@Override
- public Resource getResourceUsage() {
+ public Resource getGuaranteedResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSQueue child : childQueues) {
- Resources.addTo(usage, child.getResourceUsage());
+ Resources.addTo(usage, child.getGuaranteedResourceUsage());
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return usage;
+ }
+
+ @Override
+ public Resource getOpportunisticResourceUsage() {
+ Resource usage = Resource.newInstance(0, 0);
+ readLock.lock();
+ try {
+ for (FSQueue child : childQueues) {
+ Resources.addTo(usage, child.getOpportunisticResourceUsage());
}
} finally {
readLock.unlock();
@@ -191,14 +205,17 @@ private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) {
}
@Override
- public Resource assignContainer(FSSchedulerNode node) {
+ public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
Resource assigned = Resources.none();
// If this queue is over its limit, reject
- if (!assignContainerPreCheck(node)) {
+ if (!assignContainerPreCheck(node, opportunistic)) {
return assigned;
}
+ // TODO: try to promote OPPORTUNISTIC containers if opportunistic is true.
+ // That is, we promote before trying to allocate opportunistic containers.
+
// Hold the write lock when sorting childQueues
writeLock.lock();
try {
@@ -218,7 +235,7 @@ public Resource assignContainer(FSSchedulerNode node) {
readLock.lock();
try {
for (FSQueue child : childQueues) {
- assigned = child.assignContainer(node);
+ assigned = child.assignContainer(node, opportunistic);
if (!Resources.equals(assigned, Resources.none())) {
break;
}
@@ -302,7 +319,7 @@ protected void dumpStateInternal(StringBuilder sb) {
", SteadyFairShare: " + getSteadyFairShare() +
", MaxShare: " + getMaxShare() +
", MinShare: " + minShare +
- ", ResourceUsage: " + getResourceUsage() +
+ ", ResourceUsage: " + getGuaranteedResourceUsage() +
", Demand: " + getDemand() +
", MaxAMShare: " + maxAMShare +
", Runnable: " + getNumRunnableApps() +
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index e0df480..98a13ab 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -233,7 +233,7 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
if (getFairShare().getMemorySize() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
- queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() /
+ queueInfo.setCurrentCapacity((float) getGuaranteedResourceUsage().getMemorySize() /
getFairShare().getMemorySize());
}
@@ -417,14 +417,17 @@ public abstract void collectSchedulerApplications(
*
* @return true if check passes (can assign) or false otherwise
*/
- boolean assignContainerPreCheck(FSSchedulerNode node) {
- if (node.getReservedContainer() != null) {
+ boolean assignContainerPreCheck(FSSchedulerNode node, boolean opportunistic) {
+ if (opportunistic) {
+ // always pre-approve OPPORTUNISTIC containers to be assigned on the node
+ return true;
+ } else if (node.getReservedContainer() != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container failed on node '" + node.getNodeName()
+ " because it has reserved containers.");
}
return false;
- } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) {
+ } else if (!Resources.fitsIn(getGuaranteedResourceUsage(), getMaxShare())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container failed on node '" + node.getNodeName()
+ " because queue resource usage is larger than MaxShare: "
@@ -447,7 +450,7 @@ public boolean isActive() {
@Override
public String toString() {
return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
- getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+ getName(), getDemand(), getGuaranteedResourceUsage(), fairShare, getWeight());
}
@Override
@@ -486,7 +489,7 @@ public Priority getDefaultApplicationPriority() {
boolean fitsInMaxShare(Resource additionalResource) {
Resource usagePlusAddition =
- Resources.add(getResourceUsage(), additionalResource);
+ Resources.add(getGuaranteedResourceUsage(), additionalResource);
if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
if (LOG.isDebugEnabled()) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 37f583e..7f5deec 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -164,7 +164,6 @@
private float reservableNodesRatio; // percentage of available nodes
// an app can be reserved on
-
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
// Continuous Scheduling enabled or not
protected boolean continuousSchedulingEnabled;
@@ -184,6 +183,8 @@
boolean maxAssignDynamic;
protected int maxAssign; // Max containers to assign per heartbeat
+ protected boolean oversubscriptionEnabled;
+
@VisibleForTesting
final MaxRunningAppsEnforcer maxRunningEnforcer;
@@ -954,13 +955,13 @@ private boolean shouldContinueAssigning(int containers,
* resources for preempted containers.
* @param node Node to check
*/
- static void assignPreemptedContainers(FSSchedulerNode node) {
+ static void attemptToAssignPreemptedResources(FSSchedulerNode node) {
for (Entry entry :
node.getPreemptionList().entrySet()) {
FSAppAttempt app = entry.getKey();
Resource preemptionPending = Resources.clone(entry.getValue());
while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
- Resource assigned = app.assignContainer(node);
+ Resource assigned = app.assignContainer(node, false);
if (Resources.isNone(assigned) ||
assigned.equals(FairScheduler.CONTAINER_RESERVED)) {
// Fail to assign, let's not try further
@@ -992,44 +993,84 @@ void attemptScheduling(FSSchedulerNode node) {
// Assign new containers...
// 1. Ensure containers are assigned to the apps that preempted
// 2. Check for reserved applications
- // 3. Schedule if there are no reservations
+ // 3. Schedule GUARANTEED containers if there are no reservations
+ // 4. Schedule OPPORTUNISTIC containers if possible
// Apps may wait for preempted containers
// We have to satisfy these first to avoid cases, when we preempt
// a container for A from B and C gets the preempted containers,
// when C does not qualify for preemption itself.
- assignPreemptedContainers(node);
- FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
- boolean validReservation = false;
- if (reservedAppSchedulable != null) {
- validReservation = reservedAppSchedulable.assignReservedContainer(node);
- }
+ attemptToAssignPreemptedResources(node);
+
+ boolean validReservation = attemptToAssignReservedResources(node);
if (!validReservation) {
- // No reservation, schedule at queue which is farthest below fair share
- int assignedContainers = 0;
- Resource assignedResource = Resources.clone(Resources.none());
- Resource maxResourcesToAssign = Resources.multiply(
- node.getUnallocatedResource(), 0.5f);
- while (node.getReservedContainer() == null) {
- Resource assignment = queueMgr.getRootQueue().assignContainer(node);
- if (assignment.equals(Resources.none())) {
- break;
- }
+ // only attempt to assign GUARANTEED containers if there is no
+ // reservation on the node because
+ attemptToAssignResourcesAsGuaranteedContainers(node);
+ }
- assignedContainers++;
- Resources.addTo(assignedResource, assignment);
- if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
- assignedResource)) {
- break;
- }
- }
+ // attempt to assign OPPORTUNISTIC containers regardless of whether
+ // we have made a reservation or assigned a GUARANTEED container
+ if (oversubscriptionEnabled) {
+ attemptToAssignResourcesAsOpportunisticContainers(node);
}
+
updateRootQueueMetrics();
} finally {
writeLock.unlock();
}
}
+ /**
+ * Assign the reserved resource to the application that have reserved it.
+ */
+ private boolean attemptToAssignReservedResources(FSSchedulerNode node) {
+ boolean success = false;
+ FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+ if (reservedAppSchedulable != null) {
+ success = reservedAppSchedulable.assignReservedContainer(node);
+ }
+ return success;
+ }
+
+ private void attemptToAssignResourcesAsGuaranteedContainers(
+ FSSchedulerNode node) {
+ // No reservation, schedule at queue which is farthest below fair share
+ int assignedContainers = 0;
+ Resource assignedResource = Resources.clone(Resources.none());
+ Resource maxResourcesToAssign = Resources.multiply(
+ node.getUnallocatedResource(), 0.5f);
+ while (node.getReservedContainer() == null) {
+ Resource assignment =
+ queueMgr.getRootQueue().assignContainer(node, false);
+ if (assignment.equals(Resources.none())) {
+ break;
+ }
+ //&& !assignment.equals(CONTAINER_RESERVED)) {
+ // assigned can be FS.CONTAINER_RESERVED
+ // should we count it as an assignment?
+ assignedContainers++;
+ Resources.addTo(assignedResource, assignment);
+
+ if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
+ assignedResource)) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Try to assign OPPORTUNISTIC containers as long as there is resources
+ * to.
+ * @param node the node to assign OPPORTUNISTIC containers on
+ */
+ private void attemptToAssignResourcesAsOpportunisticContainers(
+ FSSchedulerNode node) {
+ while (!Resources.none().equals(
+ queueMgr.getRootQueue().assignContainer(node, true))) {
+ }
+ }
+
public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
return super.getApplicationAttempt(appAttemptId);
}
@@ -1269,6 +1310,7 @@ private void initScheduler(Configuration conf) throws IOException {
sizeBasedWeight = this.conf.getSizeBasedWeight();
usePortForNodeName = this.conf.getUsePortForNodeName();
reservableNodesRatio = this.conf.getReservableNodes();
+ oversubscriptionEnabled = this.conf.isOversubscriptionEnabled();
updateInterval = this.conf.getUpdateInterval();
if (updateInterval < 0) {
@@ -1636,7 +1678,7 @@ private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app,
}
// maxShare
- if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption),
+ if (!Resources.fitsIn(Resources.add(cur.getGuaranteedResourceUsage(), consumption),
cur.getMaxShare())) {
throw new YarnException("Moving app attempt " + appAttId + " to queue "
+ queueName + " would violate queue maxShare constraints on"
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index 9c9eee6..79aab9a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -223,6 +223,11 @@ public int getContinuousSchedulingSleepMs() {
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
}
+ public boolean isOversubscriptionEnabled() {
+ return getBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED);
+ }
+
public long getLocalityDelayNodeMs() {
return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
index bd1ff7a..f018929 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
@@ -58,8 +58,17 @@
*/
Resource getDemand();
- /** Get the aggregate amount of resources consumed by the schedulable. */
- Resource getResourceUsage();
+ /**
+ * Get the aggregate amount of guaranteed resources consumed by the
+ * schedulable.
+ */
+ Resource getGuaranteedResourceUsage();
+
+ /**
+ * Get the aggregate amount of opportunistic resources consumed by the
+ * schedulable.
+ */
+ Resource getOpportunisticResourceUsage();
/** Minimum Resource share assigned to the schedulable. */
Resource getMinShare();
@@ -89,8 +98,10 @@
/**
* Assign a container on this node if possible, and return the amount of
* resources assigned.
+ * @param node the node to assign containers on
+ * @param opportunistic whether to assign OPPORTUNISTIC containers or not
*/
- Resource assignContainer(FSSchedulerNode node);
+ Resource assignContainer(FSSchedulerNode node, boolean opportunistic);
/** Get the fair share assigned to this Schedulable. */
Resource getFairShare();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index e58b357..4977f8b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -126,8 +126,8 @@ public void setFSContext(FSContext fsContext) {
@Override
public int compare(Schedulable s1, Schedulable s2) {
ResourceInformation[] info = ResourceUtils.getResourceTypesArray();
- Resource usage1 = s1.getResourceUsage();
- Resource usage2 = s2.getResourceUsage();
+ Resource usage1 = s1.getGuaranteedResourceUsage();
+ Resource usage2 = s2.getGuaranteedResourceUsage();
Resource minShare1 = s1.getMinShare();
Resource minShare2 = s2.getMinShare();
Resource clusterCapacity = fsContext.getClusterResource();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 8179aa7..7ecbeea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -90,8 +90,8 @@ public int compare(Schedulable s1, Schedulable s2) {
int res = compareDemand(s1, s2);
// Pre-compute resource usages to avoid duplicate calculation
- Resource resourceUsage1 = s1.getResourceUsage();
- Resource resourceUsage2 = s2.getResourceUsage();
+ Resource resourceUsage1 = s1.getGuaranteedResourceUsage();
+ Resource resourceUsage2 = s2.getGuaranteedResourceUsage();
if (res == 0) {
res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
index 913513c..fe313a6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
@@ -88,7 +88,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
amMaxResources = new ResourceInfo(Resource.newInstance(
queue.getMetrics().getMaxAMShareMB(),
queue.getMetrics().getMaxAMShareVCores()));
- usedResources = new ResourceInfo(queue.getResourceUsage());
+ usedResources = new ResourceInfo(queue.getGuaranteedResourceUsage());
demandResources = new ResourceInfo(queue.getDemand());
fractionMemUsed = (float)usedResources.getMemorySize() /
clusterResources.getMemorySize();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 9325efc..26ba514 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -99,7 +99,7 @@ public static Resource newAvailResource(Resource total, Resource used) {
return rs;
}
- private static class MockRMNodeImpl implements RMNode {
+ public static class MockRMNodeImpl implements RMNode {
private NodeId nodeId;
private String hostName;
private String nodeAddr;
@@ -114,12 +114,26 @@ public static Resource newAvailResource(Resource total, Resource used) {
private ResourceUtilization containersUtilization;
private ResourceUtilization nodeUtilization;
private Resource physicalResource;
+ private OverAllocationInfo overAllocationInfo;
+ private List containerUpdates =
+ Collections.EMPTY_LIST;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
Set labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
+ this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport,
+ lastHealthReportTime, cmdPort, hostName, state, labels,
+ containersUtilization, nodeUtilization, pPhysicalResource, null);
+ }
+
+ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
+ Resource perNode, String rackName, String healthReport,
+ long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
+ Set labels, ResourceUtilization containersUtilization,
+ ResourceUtilization nodeUtilization, Resource pPhysicalResource,
+ OverAllocationInfo overAllocationInfo) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress;
@@ -134,6 +148,7 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
this.containersUtilization = containersUtilization;
this.nodeUtilization = nodeUtilization;
this.physicalResource = pPhysicalResource;
+ this.overAllocationInfo = overAllocationInfo;
}
@Override
@@ -221,7 +236,7 @@ public String getNodeManagerVersion() {
@Override
public List pullContainerUpdates() {
- return new ArrayList();
+ return containerUpdates;
}
@Override
@@ -265,7 +280,7 @@ public ResourceUtilization getNodeUtilization() {
@Override
public OverAllocationInfo getOverAllocationInfo() {
- return null;
+ return this.overAllocationInfo;
}
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
@@ -290,6 +305,20 @@ public Integer getDecommissioningTimeout() {
public Resource getPhysicalResource() {
return this.physicalResource;
}
+
+ public void updateResourceUtilization(ResourceUtilization utilization) {
+ this.nodeUtilization = utilization;
+ }
+
+ public void updateContainersAndNodeUtilization(
+ UpdatedContainerInfo updatedContainerInfo,
+ ResourceUtilization resourceUtilization) {
+ if (updatedContainerInfo != null) {
+ containerUpdates = new ArrayList<>(1);
+ containerUpdates.add(updatedContainerInfo);
+ }
+ this.nodeUtilization = resourceUtilization;
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
@@ -313,6 +342,15 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port,
Set labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization, Resource physicalResource) {
+ return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port,
+ labels, containersUtilization, nodeUtilization, physicalResource, null);
+ }
+
+ private static MockRMNodeImpl buildRMNode(int rack, final Resource perNode,
+ NodeState state, String httpAddr, int hostnum, String hostName, int port,
+ Set labels, ResourceUtilization containersUtilization,
+ ResourceUtilization nodeUtilization, Resource physicalResource,
+ OverAllocationInfo overAllocationInfo) {
final String rackName = "rack"+ rack;
final int nid = hostnum;
final String nodeAddr = hostName + ":" + nid;
@@ -325,9 +363,8 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
rackName, healthReport, 0, nid, hostName, state, labels,
- containersUtilization, nodeUtilization, physicalResource);
+ containersUtilization, nodeUtilization, physicalResource, overAllocationInfo);
}
-
public static RMNode nodeInfo(int rack, final Resource perNode,
NodeState state) {
return buildRMNode(rack, perNode, state, "N/A");
@@ -356,4 +393,10 @@ public static RMNode newNodeInfo(int rack, final Resource perNode,
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port);
}
+ public static MockRMNodeImpl newNodeInfo(int rack, final Resource perNode,
+ OverAllocationInfo overAllocationInfo) {
+ return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0",
+ NODE_ID++, null, 123, null, ResourceUtilization.newInstance(0, 0, 0.0f),
+ ResourceUtilization.newInstance(0, 0, 0.0f), null, overAllocationInfo);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index b0da5df..a117eed 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -478,7 +478,7 @@ private void checkFSQueue(ResourceManager rm,
FSParentQueue root = scheduler.getQueueManager().getRootQueue();
// ************ check cluster used Resources ********
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
- assertEquals(usedResources,root.getResourceUsage());
+ assertEquals(usedResources,root.getGuaranteedResourceUsage());
// ************ check app headroom ****************
FSAppAttempt schedulerAttempt =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
index 03332b2..6adc63c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
@@ -82,7 +82,7 @@ public FakeSchedulable(Resource minShare, Resource maxShare,
}
@Override
- public Resource assignContainer(FSSchedulerNode node) {
+ public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
return null;
}
@@ -112,11 +112,16 @@ public Priority getPriority() {
}
@Override
- public Resource getResourceUsage() {
+ public Resource getGuaranteedResourceUsage() {
return usage;
}
@Override
+ public Resource getOpportunisticResourceUsage() {
+ return Resource.newInstance(0, 0);
+ }
+
+ @Override
public long getStartTime() {
return startTime;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
index f581935..6d0af47 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
@@ -222,7 +222,7 @@ public void testMoveRunnableApp() throws Exception {
scheduler.handle(nodeEvent);
scheduler.handle(updateEvent);
- assertEquals(Resource.newInstance(1024, 1), oldQueue.getResourceUsage());
+ assertEquals(Resource.newInstance(1024, 1), oldQueue.getGuaranteedResourceUsage());
scheduler.update();
assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand());
@@ -231,8 +231,8 @@ public void testMoveRunnableApp() throws Exception {
assertSame(targetQueue, app.getQueue());
assertFalse(oldQueue.isRunnableApp(app));
assertTrue(targetQueue.isRunnableApp(app));
- assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage());
- assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage());
+ assertEquals(Resource.newInstance(0, 0), oldQueue.getGuaranteedResourceUsage());
+ assertEquals(Resource.newInstance(1024, 1), targetQueue.getGuaranteedResourceUsage());
assertEquals(0, oldQueue.getNumRunnableApps());
assertEquals(1, targetQueue.getNumRunnableApps());
assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 46187d9..39c4d87 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -223,7 +223,7 @@ public void testHeadroom() {
Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources);
Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
- Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
+ Mockito.when(mockQueue.getGuaranteedResourceUsage()).thenReturn(queueUsage);
Mockito.when(mockScheduler.getClusterResource()).thenReturn
(clusterResource);
Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
@@ -305,7 +305,7 @@ public void testHeadroomWithBlackListedNodes() {
getApplicationId()));
FSAppAttempt app = scheduler.getSchedulerApp(id11);
assertNotNull(app);
- Resource queueUsage = app.getQueue().getResourceUsage();
+ Resource queueUsage = app.getQueue().getGuaranteedResourceUsage();
assertEquals(0, queueUsage.getMemorySize());
assertEquals(0, queueUsage.getVirtualCores());
SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 0bba35d..9438c85 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -175,7 +175,7 @@ public void run() {
@Override
public void run() {
for (int i=0; i < 500; i++) {
- schedulable.getResourceUsage();
+ schedulable.getGuaranteedResourceUsage();
}
}
});
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
index 6726f17..f79ba4c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
@@ -98,7 +98,7 @@ private FSAppAttempt createStarvingApp(FSSchedulerNode schedulerNode,
ApplicationAttemptId appAttemptId =
mock(ApplicationAttemptId.class);
when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
- when(starvingApp.assignContainer(schedulerNode)).thenAnswer(
+ when(starvingApp.assignContainer(schedulerNode, false)).thenAnswer(
new Answer() {
@Override
public Resource answer(InvocationOnMock invocationOnMock)
@@ -142,7 +142,7 @@ private void finalValidation(FSSchedulerNode schedulerNode) {
}
private void allocateContainers(FSSchedulerNode schedulerNode) {
- FairScheduler.assignPreemptedContainers(schedulerNode);
+ FairScheduler.attemptToAssignPreemptedResources(schedulerNode);
}
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index e1f155e..a3f6100 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -55,13 +55,19 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -71,6 +77,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -92,6 +100,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -1054,15 +1063,15 @@ public void testSimpleContainerAllocation() throws IOException {
assertEquals(
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(2, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getVirtualCores());
+ getGuaranteedResourceUsage().getVirtualCores());
// verify metrics
QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
@@ -1097,7 +1106,7 @@ public void testSimpleContainerReservation() throws Exception {
// Make sure queue 1 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests likewise
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1107,7 +1116,7 @@ public void testSimpleContainerReservation() throws Exception {
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
// Now another node checks in with capacity
@@ -1121,7 +1130,7 @@ public void testSimpleContainerReservation() throws Exception {
// Make sure this goes to queue 2
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// The old reservation should still be there...
assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
@@ -1131,7 +1140,7 @@ public void testSimpleContainerReservation() throws Exception {
}
- @Test (timeout = 5000)
+ @Test
public void testOffSwitchAppReservationThreshold() throws Exception {
conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
scheduler.init(conf);
@@ -1171,7 +1180,7 @@ public void testOffSwitchAppReservationThreshold() throws Exception {
// Verify capacity allocation
assertEquals(6144, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Create new app with a resource request that can be satisfied by any
// node but would be
@@ -1203,7 +1212,7 @@ public void testOffSwitchAppReservationThreshold() throws Exception {
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1264,7 +1273,7 @@ public void testRackLocalAppReservationThreshold() throws Exception {
// Verify capacity allocation
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Create new app with a resource request that can be satisfied by any
// node but would be
@@ -1309,7 +1318,7 @@ public void testRackLocalAppReservationThreshold() throws Exception {
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
assertEquals(10240, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1353,7 +1362,7 @@ public void testReservationThresholdWithAssignMultiple() throws Exception {
// Verify capacity allocation
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Verify number of reservations have decremented
assertEquals(0,
@@ -1397,7 +1406,7 @@ public void testContainerReservationAttemptExceedingQueueMax()
// Make sure queue 1 is allocated app capacity
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests likewise
createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1406,7 +1415,7 @@ public void testContainerReservationAttemptExceedingQueueMax()
// Make sure queue 2 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
@@ -1532,7 +1541,7 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception {
// Make sure queue 1 is allocated app capacity
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests likewise
createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1541,7 +1550,7 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception {
// Make sure queue 2 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
@@ -1581,12 +1590,12 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception {
// Make sure allocated memory of queue1 doesn't exceed its maximum
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
//the reservation of queue1 should be reclaim
assertEquals(0, scheduler.getSchedulerApp(attId1).
getCurrentReservation().getMemorySize());
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
}
@Test
@@ -1626,7 +1635,7 @@ public void testReservationThresholdGatesReservations() throws Exception {
// Make sure queue 1 is allocated app capacity
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests below threshold
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1635,7 +1644,7 @@ public void testReservationThresholdGatesReservations() throws Exception {
// Make sure queue 2 has no reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(0,
scheduler.getSchedulerApp(attId).getReservedContainers().size());
@@ -1646,7 +1655,7 @@ public void testReservationThresholdGatesReservations() throws Exception {
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
.getVirtualCores());
@@ -1661,7 +1670,7 @@ public void testReservationThresholdGatesReservations() throws Exception {
// Make sure this goes to queue 2
assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getVirtualCores());
+ getGuaranteedResourceUsage().getVirtualCores());
// The old reservation should still be there...
assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
@@ -2694,7 +2703,358 @@ public void testReservationWithMultiplePriorities() throws IOException {
2, liveContainers.iterator().next().getContainer().
getPriority().getPriority());
}
-
+
+ /**
+ * Test that NO OPPORTUNISTIC containers can be allocated on a node that
+ * is fully allocated and with a very high utilization.
+ */
+ @Test
+ public void testAllocateNoOpportunisticContainersOnBusyNode()
+ throws IOException {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation threshold
+ // of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(2048, 2), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that takes up the node's full memory
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(2048, "queue1", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization shoots up after the container runs on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(2000, 0, 0.8f));
+
+ // create another scheduling request
+ ApplicationAttemptId appAttempt2
+ = createSchedulingRequest(100, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ List allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue("Expecting no containers allocated",
+ allocatedContainers2.size() == 0);
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+
+ // verify that a reservation is made for the second resource request
+ Resource reserved = scheduler.getNode(node.getNodeID()).
+ getReservedContainer().getReservedResource();
+ assertTrue("Expect a reservation made for the second resource request",
+ reserved.equals(Resource.newInstance(100, 1)));
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test that OPPORTUNISTIC containers can be allocated on a node with low
+ * utilization even though there is not enough unallocated resource on the
+ * node to accommodate the request.
+ */
+ @Test
+ public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode()
+ throws IOException {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation threshold
+ // of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that leaves some unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(3600, "queue1", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container is launched on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+ // create another scheduling request that asks for more than what's left
+ // unallocated on the node but can be served with overallocation.
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(1024, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ List allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // verify that no reservation is made for the second request given that it's
+ // satisfied by an OPPORTUNISTIC container allocation.
+ assertTrue("No reservation should be made because we have satisfied" +
+ " the second request with an OPPORTUNISTIC container allocation",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test opportunistic containers can be allocated on a node that is fully
+ * allocated but whose utilization is very low.
+ */
+ @Test
+ public void testAllocateOpportunisticContainersOnFullyAllocatedNode()
+ throws IOException {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation threshold
+ // of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that takes up the whole node
+ ApplicationAttemptId appAttempt1 = createSchedulingRequest(
+ 4096, "queue1", "user1", 4);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container is launched on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+ // create another scheduling request now that there is no unallocated
+ // resources left on the node, the request should be served with an
+ // allocation of an opportunistic container
+ ApplicationAttemptId appAttempt2 = createSchedulingRequest(
+ 1024, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ List allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // verify that no reservation is made for the second request given that it's
+ // satisfied by an OPPORTUNISTIC container allocation.
+ assertTrue("No reservation should be made because we have satisfied" +
+ " the second request with an OPPORTUNISTIC container allocation",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test opportunistic containers can be allocated on a node with a low
+ * utilization even though there are GUARANTEED containers allocated.
+ */
+ @Test
+ public void testAllocateOpportunisticContainersWithGuaranteedOnes()
+ throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation threshold
+ // of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(3200, "queue1", "user1", 3);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(3200, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container is launched on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(512, 0, 0.1f));
+
+ // create two other scheduling requests which in aggregate ask for more
+ // that what's left unallocated on the node.
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(512, "queue2", "user1", 1);
+ ApplicationAttemptId appAttempt3 =
+ createSchedulingRequest(1024, "queue3", "user1", 1);
+
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(512, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ List allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers2.get(0).getExecutionType());
+
+ List allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers3.get(0).getExecutionType());
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+
+ // verify that no reservation is made given that the second request should
+ // be satisfied by a GUARANTEED container allocation, the third by an
+ // OPPORTUNISTIC container allocation.
+ assertTrue("No reservation should be made.",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
@Test
public void testAclSubmitApplication() throws Exception {
// Set acl's
@@ -3684,7 +4044,7 @@ public void testMultipleCompletedEvent() throws Exception {
.createAbnormalContainerStatus(container.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.FINISHED);
- assertEquals(Resources.none(), app1.getResourceUsage());
+ assertEquals(Resources.none(), app1.getGuaranteedResourceUsage());
}
@Test
@@ -3784,7 +4144,7 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Application1's AM should be finished",
0, app1.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app1.getResourceUsage());
+ Resources.none(), app1.getGuaranteedResourceUsage());
assertEquals("Application3's AM should be running",
1, app3.getLiveContainers().size());
assertEquals("Application3's AM requests 1024 MB memory",
@@ -3804,7 +4164,7 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Application4's AM should not be running",
0, app4.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app4.getResourceUsage());
+ Resources.none(), app4.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
@@ -3820,7 +4180,7 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Application5's AM should not be running",
0, app5.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app5.getResourceUsage());
+ Resources.none(), app5.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
@@ -3833,7 +4193,7 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Application5's AM should not be running",
0, app5.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app5.getResourceUsage());
+ Resources.none(), app5.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
@@ -3849,11 +4209,11 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Application2's AM should be finished",
0, app2.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app2.getResourceUsage());
+ Resources.none(), app2.getGuaranteedResourceUsage());
assertEquals("Application3's AM should be finished",
0, app3.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app3.getResourceUsage());
+ Resources.none(), app3.getGuaranteedResourceUsage());
assertEquals("Application5's AM should be running",
1, app5.getLiveContainers().size());
assertEquals("Application5's AM requests 2048 MB memory",
@@ -3874,7 +4234,7 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Application5's AM should have 0 container",
0, app5.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app5.getResourceUsage());
+ Resources.none(), app5.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
scheduler.update();
@@ -3898,7 +4258,7 @@ public void testQueueMaxAMShare() throws Exception {
assertEquals("Application6's AM should not be running",
0, app6.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app6.getResourceUsage());
+ Resources.none(), app6.getGuaranteedResourceUsage());
assertEquals("Application6's AM resource shouldn't be updated",
0, app6.getAMResource().getMemorySize());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
@@ -4627,7 +4987,7 @@ public void testMoveWouldViolateMaxResourcesConstraints() throws Exception {
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
- assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage());
+ assertEquals(Resource.newInstance(2048, 2), oldQueue.getGuaranteedResourceUsage());
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
}
@@ -5051,7 +5411,7 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception {
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
//container will be reserved at node1
RMContainer reservedContainer1 =
@@ -5071,7 +5431,7 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception {
app1, RMAppAttemptState.KILLED, false));
assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// container will be allocated at node2
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -5276,7 +5636,7 @@ public void testDumpState() throws IOException {
child1.setMaxShare(new ConfigurableResource(resource));
FSAppAttempt app = mock(FSAppAttempt.class);
Mockito.when(app.getDemand()).thenReturn(resource);
- Mockito.when(app.getResourceUsage()).thenReturn(resource);
+ Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(resource);
child1.addApp(app, true);
child1.updateDemand();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index b016c1b..1e4f05e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -243,11 +243,16 @@ public Resource getDemand() {
}
@Override
- public Resource getResourceUsage() {
+ public Resource getGuaranteedResourceUsage() {
return usage;
}
@Override
+ public Resource getOpportunisticResourceUsage() {
+ return Resource.newInstance(0, 0);
+ }
+
+ @Override
public Resource getMinShare() {
return minShare;
}
@@ -278,7 +283,7 @@ public void updateDemand() {
}
@Override
- public Resource assignContainer(FSSchedulerNode node) {
+ public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
throw new UnsupportedOperationException();
}