diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java index a5aee74..1f4e7c7 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java @@ -75,7 +75,7 @@ private long getMemorySize(Schedulable schedulable, Metric metric) { case DEMAND: return schedulable.getDemand().getMemorySize(); case USAGE: - return schedulable.getResourceUsage().getMemorySize(); + return schedulable.getGuaranteedResourceUsage().getMemorySize(); case MINSHARE: return schedulable.getMinShare().getMemorySize(); case MAXSHARE: @@ -96,7 +96,7 @@ private int getVirtualCores(Schedulable schedulable, Metric metric) { case DEMAND: return schedulable.getDemand().getVirtualCores(); case USAGE: - return schedulable.getResourceUsage().getVirtualCores(); + return schedulable.getGuaranteedResourceUsage().getVirtualCores(); case MINSHARE: return schedulable.getMinShare().getVirtualCores(); case MAXSHARE: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f9c8b69..68c7acf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -266,6 +266,11 @@ private static void addDeprecatedKeys() { /** UserGroupMappingPlacementRule configuration string. */ public static final String USER_GROUP_PLACEMENT_RULE = "user-group"; + public static final String RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED = + RM_PREFIX + "scheduler.oversubscription.enabled"; + public static final boolean DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED + = false; + /** Enable Resource Manager webapp ui actions */ public static final String RM_WEBAPP_UI_ACTIONS_ENABLED = RM_PREFIX + "webapp.ui-actions.enabled"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 89d4d1f..09fe4a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -381,6 +381,19 @@ + + If set to true, the scheduler will try to over-allocate resources on the + nodes that allow overallocation. To enable overallocatin on a node, set + {code}yarn.nodemanager.overallocation.memory-utilization-threshold {code} + and + {code}yarn.nodemanager.overallocation.cpu-utilization-threshold {code} + to a number in the range (0.0, 1.0) + + yarn.scheduler.overallocation.enabled + false + + + Enable RM to recover state after starting. If true, then yarn.resourcemanager.store.class must be specified. yarn.resourcemanager.recovery.enabled diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 349944e..4cfe280 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -59,6 +61,10 @@ private static final Log LOG = LogFactory.getLog(SchedulerNode.class); private Resource capacity; + // The resource available within the node's capacity that can be given out + // to run GUARANTEED containers, including reserved, preempted and any + // remaining free resources. Resources allocated to OPPORTUNISTIC containers + // are tracked in allocatedResourceOpportunistic private Resource unallocatedResource = Resource.newInstance(0, 0); private RMContainer reservedContainer; @@ -596,6 +602,48 @@ public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + /** + * Get the amount of resources that can be allocated to opportunistic + * containers in the case of overallocation. It is calculated as + * node capacity - (node utilization + resources of allocated-yet-not-started + * containers). + * @return the amount of resources that are available to be allocated to + * opportunistic containers + */ + public synchronized Resource allowedResourceForOverAllocation() { + OverAllocationInfo overAllocationInfo = rmNode.getOverAllocationInfo(); + if (overAllocationInfo == null) { + LOG.debug("Overallocation is disabled on node: " + rmNode.getHostName()); + return Resources.none(); + } + + ResourceUtilization projectedNodeUtilization = ResourceUtilization. + newInstance(getNodeUtilization()); + // account for resources allocated in this heartbeat + projectedNodeUtilization.addTo( + (int) (resourceAllocatedPendingLaunch.getMemorySize()), 0, + (float) resourceAllocatedPendingLaunch.getVirtualCores() / + capacity.getVirtualCores()); + + ResourceThresholds thresholds = + overAllocationInfo.getOverAllocationThresholds(); + Resource overAllocationThreshold = Resources.createResource( + (long) (capacity.getMemorySize() * thresholds.getMemoryThreshold()), + (int) (capacity.getVirtualCores() * thresholds.getCpuThreshold())); + long allowedMemory = Math.max(0, overAllocationThreshold.getMemorySize() + - projectedNodeUtilization.getPhysicalMemory()); + int allowedCpu = Math.max(0, (int) + (overAllocationThreshold.getVirtualCores() - + projectedNodeUtilization.getCPU() * capacity.getVirtualCores())); + + Resource resourceAllowedForOpportunisticContainers = + Resources.createResource(allowedMemory, allowedCpu); + + // TODO cap the resources allocated to OPPORTUNISTIC containers on a node + // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed) + return resourceAllowedForOpportunisticContainers; +} + private static class ContainerInfo { private final RMContainer container; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 157d264..a15cce9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -168,7 +169,7 @@ void containerCompleted(RMContainer rmContainer, rmContainer.getNodeLabelExpression(), getUser(), 1, containerResource); this.attemptResourceUsage.decUsed(containerResource); - getQueue().decUsedResource(containerResource); + getQueue().decUsedGuaranteedResource(containerResource); // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; @@ -177,30 +178,35 @@ void containerCompleted(RMContainer rmContainer, } } - private void unreserveInternal( + private boolean unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { try { writeLock.lock(); Map reservedContainers = this.reservedContainers.get( schedulerKey); - RMContainer reservedContainer = reservedContainers.remove( - node.getNodeID()); - if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(schedulerKey); - } + boolean unreserved = false; + if (reservedContainers != null) { + RMContainer reservedContainer = reservedContainers.remove( + node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(schedulerKey); + } - // Reset the re-reservation count - resetReReservations(schedulerKey); + // Reset the re-reservation count + resetReReservations(schedulerKey); - Resource resource = reservedContainer.getContainer().getResource(); - this.attemptResourceUsage.decReserved(resource); + Resource resource = reservedContainer.getContainer().getResource(); + this.attemptResourceUsage.decReserved(resource); + unreserved = true; - LOG.info( - "Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() - + " at priority " + schedulerKey.getPriority() - + "; currentReservation " + this.attemptResourceUsage - .getReserved()); + LOG.info( + "Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + + " at priority " + schedulerKey.getPriority() + + "; currentReservation " + this.attemptResourceUsage + .getReserved()); + } + return unreserved; } finally { writeLock.unlock(); } @@ -228,7 +234,7 @@ public Resource getHeadroom() { SchedulingPolicy policy = fsQueue.getPolicy(); Resource queueFairShare = fsQueue.getFairShare(); - Resource queueUsage = fsQueue.getResourceUsage(); + Resource queueUsage = fsQueue.getGuaranteedResourceUsage(); Resource clusterResource = this.scheduler.getClusterResource(); Resource clusterUsage = this.scheduler.getRootQueueMetrics() .getAllocatedResources(); @@ -419,7 +425,7 @@ NodeType getAllowedLocalityLevelByTime( public RMContainer allocate(NodeType type, FSSchedulerNode node, SchedulerRequestKey schedulerKey, PendingAsk pendingAsk, - Container reservedContainer) { + Container reservedContainer, boolean opportunistic) { RMContainer rmContainer; Container container; @@ -444,9 +450,11 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, } container = reservedContainer; + ExecutionType executionType = opportunistic ? + ExecutionType.OPPORTUNISTIC : ExecutionType.GUARANTEED; if (container == null) { container = createContainer(node, pendingAsk.getPerAllocationResource(), - schedulerKey); + schedulerKey, executionType); } // Create RMContainer @@ -462,8 +470,12 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( type, node, schedulerKey, container); - this.attemptResourceUsage.incUsed(container.getResource()); - getQueue().incUsedResource(container.getResource()); + if (executionType.equals(ExecutionType.GUARANTEED)) { + this.attemptResourceUsage.incUsed(container.getResource()); + getQueue().incUsedGuaranteedResource(container.getResource()); + } else { + this.attemptOpportunisticResourceUsage.incUsed(container.getResource()); + } // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); @@ -610,7 +622,7 @@ boolean canContainerBePreempted(RMContainer container) { // Check if the app's allocation will be over its fairshare even // after preempting this container - Resource usageAfterPreemption = Resources.clone(getResourceUsage()); + Resource usageAfterPreemption = Resources.clone(getGuaranteedResourceUsage()); // Subtract resources of containers already queued for preemption synchronized (preemptionVariablesLock) { @@ -634,7 +646,7 @@ boolean canContainerBePreempted(RMContainer container) { * @return Container */ private Container createContainer(FSSchedulerNode node, Resource capability, - SchedulerRequestKey schedulerKey) { + SchedulerRequestKey schedulerKey, ExecutionType executionType) { NodeId nodeId = node.getRMNode().getNodeID(); ContainerId containerId = BuilderUtils.newContainerId( @@ -644,7 +656,7 @@ private Container createContainer(FSSchedulerNode node, Resource capability, return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, schedulerKey.getPriority(), null, - schedulerKey.getAllocationRequestId()); + executionType, schedulerKey.getAllocationRequestId()); } @Override @@ -656,7 +668,7 @@ public synchronized void recoverContainer(SchedulerNode node, super.recoverContainer(node, rmContainer); if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) { - getQueue().incUsedResource(rmContainer.getContainer().getResource()); + getQueue().incUsedGuaranteedResource(rmContainer.getContainer().getResource()); } // If not running unmanaged, the first container we recover is always @@ -694,7 +706,7 @@ private boolean reserve(Resource perAllocationResource, FSSchedulerNode node, if (reservedContainer == null) { reservedContainer = createContainer(node, perAllocationResource, - schedulerKey); + schedulerKey, ExecutionType.GUARANTEED); getMetrics().reserveResource(node.getPartition(), getUser(), reservedContainer.getResource()); RMContainer rmContainer = @@ -751,11 +763,12 @@ private boolean reservationExceedsThreshold(FSSchedulerNode node, public void unreserve(SchedulerRequestKey schedulerKey, FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); - unreserveInternal(schedulerKey, node); - node.unreserveResource(this); - clearReservation(node); - getMetrics().unreserveResource(node.getPartition(), - getUser(), rmContainer.getContainer().getResource()); + if (unreserveInternal(schedulerKey, node)) { + node.unreserveResource(this); + clearReservation(node); + getMetrics().unreserveResource(node.getPartition(), + getUser(), rmContainer.getContainer().getResource()); + } } private void setReservation(SchedulerNode node) { @@ -829,13 +842,15 @@ int getNumReservations(String rackName, boolean isAny) { */ private Resource assignContainer( FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, - boolean reserved, SchedulerRequestKey schedulerKey) { + boolean reserved, boolean opportunistic, + SchedulerRequestKey schedulerKey) { // How much does this request need? Resource capability = pendingAsk.getPerAllocationResource(); // How much does the node have? - Resource available = node.getUnallocatedResource(); + Resource available = opportunistic ? node.allowedResourceForOverAllocation() : + node.getUnallocatedResource(); Container reservedContainer = null; if (reserved) { @@ -847,33 +862,39 @@ private Resource assignContainer( // Inform the application of the new container for this request RMContainer allocatedContainer = allocate(type, node, schedulerKey, pendingAsk, - reservedContainer); - if (allocatedContainer == null) { - // Did the application need this resource? - if (reserved) { - unreserve(schedulerKey, node); - } - return Resources.none(); - } + reservedContainer, opportunistic); - // If we had previously made a reservation, delete it + // delete the previous reservation, if any if (reserved) { unreserve(schedulerKey, node); } - // Inform the node - node.allocateContainer(allocatedContainer); + if (allocatedContainer != null) { + if (opportunistic) { + // if an OPPORTUNISTIC container is allocated, we need to + // unreserve anything that we may have reserved in our + // previous attempt to assign GUARANTEED containers for this + // scheduling request. + unreserve(schedulerKey, node); + } + - // If not running unmanaged, the first container we allocate is always - // the AM. Set the amResource for this app and update the leaf queue's AM - // usage - if (!isAmRunning() && !getUnmanagedAM()) { - setAMResource(capability); - getQueue().addAMResourceUsage(capability); - setAmRunning(true); - } + // Inform the node + node.allocateContainer(allocatedContainer); + + // If not running unmanaged, the first container we allocate + // is always the AM. Set amResource for this app and update + // the leaf queue's AM usage + if (!isAmRunning() && !getUnmanagedAM()) { + setAMResource(capability); + getQueue().addAMResourceUsage(capability); + setAmRunning(true); + } - return capability; + return capability; + } else { + return Resources.none(); + } } if (LOG.isDebugEnabled()) { @@ -884,7 +905,7 @@ private Resource assignContainer( // The desired container won't fit here, so reserve // Reserve only, if app does not wait for preempted resources on the node, // otherwise we may end up with duplicate reservations - if (isReservable(capability) && + if (isReservable(capability) && !opportunistic && !node.isPreemptedForApp(this) && reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer, type, schedulerKey)) { @@ -929,7 +950,8 @@ private boolean isOverAMShareLimit() { return false; } - private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + private Resource assignContainer(FSSchedulerNode node, boolean opportunistic, + boolean reserved) { if (LOG.isTraceEnabled()) { LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved); } @@ -992,7 +1014,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + ", app attempt id: " + this.attemptId); } return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL, - reserved, schedulerKey); + reserved, opportunistic, schedulerKey); } if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) { @@ -1009,7 +1031,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + ", app attempt id: " + this.attemptId); } return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL, - reserved, schedulerKey); + reserved, opportunistic, schedulerKey); } PendingAsk offswitchAsk = getPendingAsk(schedulerKey, @@ -1029,7 +1051,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + ", app attempt id: " + this.attemptId); } return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH, - reserved, schedulerKey); + reserved, opportunistic, schedulerKey); } } @@ -1130,7 +1152,7 @@ boolean assignReservedContainer(FSSchedulerNode node) { // there's only one container size per priority. if (Resources.fitsIn(node.getReservedContainer().getReservedResource(), node.getUnallocatedResource())) { - assignContainer(node, true); + assignContainer(node, false, true); } return true; } @@ -1146,7 +1168,7 @@ Resource fairShareStarvation() { Resource fairDemand = Resources.componentwiseMin(threshold, demand); // Check if the queue is starved for fairshare - boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand); + boolean starved = isUsageBelowShare(getGuaranteedResourceUsage(), fairDemand); if (!starved) { lastTimeAtFairShare = now; @@ -1159,7 +1181,7 @@ Resource fairShareStarvation() { } else { // The app has been starved for longer than preemption-timeout. fairshareStarvation = - Resources.subtractFromNonNegative(fairDemand, getResourceUsage()); + Resources.subtractFromNonNegative(fairDemand, getGuaranteedResourceUsage()); } return fairshareStarvation; } @@ -1178,7 +1200,7 @@ private boolean isUsageBelowShare(Resource usage, Resource share) { * @return true if the app is starved for fairshare, false otherwise */ boolean isStarvedForFairShare() { - return isUsageBelowShare(getResourceUsage(), getFairShare()); + return isUsageBelowShare(getGuaranteedResourceUsage(), getFairShare()); } /** @@ -1279,7 +1301,7 @@ public Resource getDemand() { * Get the current app's unsatisfied demand. */ Resource getPendingDemand() { - return Resources.subtract(demand, getResourceUsage()); + return Resources.subtract(demand, getGuaranteedResourceUsage()); } @Override @@ -1298,11 +1320,16 @@ public Resource getMaxShare() { } @Override - public Resource getResourceUsage() { + public Resource getGuaranteedResourceUsage() { return getCurrentConsumption(); } @Override + public Resource getOpportunisticResourceUsage() { + return attemptOpportunisticResourceUsage.getUsed(); + } + + @Override public float getWeight() { return scheduler.getAppWeight(this); } @@ -1344,7 +1371,7 @@ public void updateDemand() { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { if (isOverAMShareLimit()) { PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), @@ -1356,7 +1383,7 @@ public Resource assignContainer(FSSchedulerNode node) { } return Resources.none(); } - return assignContainer(node, false); + return assignContainer(node, opportunistic, false); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49d2166..4e80f2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -89,7 +89,7 @@ void addApp(FSAppAttempt app, boolean runnable) { } else { nonRunnableApps.add(app); } - incUsedResource(app.getResourceUsage()); + incUsedGuaranteedResource(app.getGuaranteedResourceUsage()); } finally { writeLock.unlock(); } @@ -124,7 +124,7 @@ boolean removeApp(FSAppAttempt app) { getMetrics().setAMResourceUsage(amResourceUsage); } - decUsedResource(app.getResourceUsage()); + decUsedGuaranteedResource(app.getGuaranteedResourceUsage()); return runnable; } @@ -294,6 +294,42 @@ public Resource getDemand() { return demand; } + @Override + public Resource getGuaranteedResourceUsage() { + Resource guaranteedResource = Resources.createResource(0); + readLock.lock(); + try { + for (FSAppAttempt app : runnableApps) { + Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage()); + } + for (FSAppAttempt app : nonRunnableApps) { + Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage()); + } + } finally { + readLock.unlock(); + } + return guaranteedResource; + } + + @Override + public Resource getOpportunisticResourceUsage() { + Resource opportunisticResource = Resource.newInstance(0, 0); + readLock.lock(); + try { + for (FSAppAttempt app : runnableApps) { + Resources.addTo(opportunisticResource, + app.getOpportunisticResourceUsage()); + } + for (FSAppAttempt app : nonRunnableApps) { + Resources.addTo(opportunisticResource, + app.getOpportunisticResourceUsage()); + } + } finally { + readLock.unlock(); + } + return opportunisticResource; + } + Resource getAmResourceUsage() { return amResourceUsage; } @@ -327,14 +363,14 @@ public void updateDemand() { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { Resource assigned = none(); if (LOG.isDebugEnabled()) { LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName() + " fairShare: " + getFairShare()); } - if (!assignContainerPreCheck(node)) { + if (!assignContainerPreCheck(node, opportunistic)) { return assigned; } @@ -342,7 +378,7 @@ public Resource assignContainer(FSSchedulerNode node) { if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } - assigned = sched.assignContainer(node); + assigned = sched.assignContainer(node, opportunistic); if (!assigned.equals(none())) { if (LOG.isDebugEnabled()) { LOG.debug("Assigned container in queue:" + getName() + " " + @@ -540,7 +576,7 @@ private Resource minShareStarvation() { Resource desiredShare = Resources.min(policy.getResourceCalculator(), scheduler.getClusterResource(), getMinShare(), getDemand()); - Resource starvation = Resources.subtract(desiredShare, getResourceUsage()); + Resource starvation = Resources.subtract(desiredShare, getGuaranteedResourceUsage()); boolean starved = !Resources.isNone(starvation); long now = scheduler.getClock().getTime(); @@ -598,7 +634,7 @@ protected void dumpStateInternal(StringBuilder sb) { ", SteadyFairShare: " + getSteadyFairShare() + ", MaxShare: " + getMaxShare() + ", MinShare: " + minShare + - ", ResourceUsage: " + getResourceUsage() + + ", ResourceUsage: " + getGuaranteedResourceUsage() + ", Demand: " + getDemand() + ", Runnable: " + getNumRunnableApps() + ", NumPendingApps: " + getNumPendingApps() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index a8e53fc..811aef7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -119,6 +119,34 @@ public Resource getDemand() { } @Override + public Resource getGuaranteedResourceUsage() { + Resource guaranteedResource = Resources.createResource(0); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(guaranteedResource, child.getGuaranteedResourceUsage()); + } + } finally { + readLock.unlock(); + } + return guaranteedResource; + } + + @Override + public Resource getOpportunisticResourceUsage() { + Resource opportunisticResource = Resource.newInstance(0, 0); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(opportunisticResource, + child.getOpportunisticResourceUsage()); + } + } finally { + readLock.unlock(); + } + return opportunisticResource; + } + public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources @@ -177,14 +205,17 @@ private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { Resource assigned = Resources.none(); // If this queue is over its limit, reject - if (!assignContainerPreCheck(node)) { + if (!assignContainerPreCheck(node, opportunistic)) { return assigned; } + // TODO: try to promote OPPORTUNISTIC containers if opportunistic is true. + // That is, we promote before trying to allocate opportunistic containers. + // Hold the write lock when sorting childQueues writeLock.lock(); try { @@ -204,7 +235,7 @@ public Resource assignContainer(FSSchedulerNode node) { readLock.lock(); try { for (FSQueue child : childQueues) { - assigned = child.assignContainer(node); + assigned = child.assignContainer(node, opportunistic); if (!Resources.equals(assigned, Resources.none())) { break; } @@ -288,7 +319,7 @@ protected void dumpStateInternal(StringBuilder sb) { ", SteadyFairShare: " + getSteadyFairShare() + ", MaxShare: " + getMaxShare() + ", MinShare: " + minShare + - ", ResourceUsage: " + getResourceUsage() + + ", Guaranteed ResourceUsage: " + getGuaranteedResourceUsage() + ", Demand: " + getDemand() + ", MaxAMShare: " + maxAMShare + ", Runnable: " + getNumRunnableApps() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 4babfd5..0abc523 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -57,7 +57,7 @@ private Resource fairShare = Resources.createResource(0, 0); private Resource steadyFairShare = Resources.createResource(0, 0); private Resource reservedResource = Resources.createResource(0, 0); - private final Resource resourceUsage = Resource.newInstance(0, 0); + private final Resource guaranteedResourceUsage = Resource.newInstance(0, 0); private final String name; protected final FairScheduler scheduler; private final YarnAuthorizationProvider authorizer; @@ -234,7 +234,7 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { if (getFairShare().getMemorySize() == 0) { queueInfo.setCurrentCapacity(0.0f); } else { - queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() / + queueInfo.setCurrentCapacity((float) getGuaranteedResourceUsage().getMemorySize() / getFairShare().getMemorySize()); } @@ -418,14 +418,17 @@ public abstract void collectSchedulerApplications( * * @return true if check passes (can assign) or false otherwise */ - boolean assignContainerPreCheck(FSSchedulerNode node) { - if (node.getReservedContainer() != null) { + boolean assignContainerPreCheck(FSSchedulerNode node, boolean opportunistic) { + if (opportunistic) { + // always pre-approve OPPORTUNISTIC containers to be assigned on the node + return true; + } else if (node.getReservedContainer() != null) { if (LOG.isDebugEnabled()) { LOG.debug("Assigning container failed on node '" + node.getNodeName() + " because it has reserved containers."); } return false; - } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) { + } else if (!Resources.fitsIn(getGuaranteedResourceUsage(), getMaxShare())) { if (LOG.isDebugEnabled()) { LOG.debug("Assigning container failed on node '" + node.getNodeName() + " because queue resource usage is larger than MaxShare: " @@ -448,7 +451,7 @@ public boolean isActive() { @Override public String toString() { return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeight()); + getName(), getDemand(), getGuaranteedResourceUsage(), fairShare, getWeight()); } @Override @@ -480,8 +483,8 @@ public void decReservedResource(String nodeLabel, Resource resourceToDec) { } @Override - public Resource getResourceUsage() { - return resourceUsage; + public Resource getGuaranteedResourceUsage() { + return guaranteedResourceUsage; } /** @@ -489,11 +492,11 @@ public Resource getResourceUsage() { * * @param res the resource to increase */ - protected void incUsedResource(Resource res) { - synchronized (resourceUsage) { - Resources.addTo(resourceUsage, res); + protected void incUsedGuaranteedResource(Resource res) { + synchronized (guaranteedResourceUsage) { + Resources.addTo(guaranteedResourceUsage, res); if (parent != null) { - parent.incUsedResource(res); + parent.incUsedGuaranteedResource(res); } } } @@ -503,11 +506,11 @@ protected void incUsedResource(Resource res) { * * @param res the resource to decrease */ - protected void decUsedResource(Resource res) { - synchronized (resourceUsage) { - Resources.subtractFrom(resourceUsage, res); + protected void decUsedGuaranteedResource(Resource res) { + synchronized (guaranteedResourceUsage) { + Resources.subtractFrom(guaranteedResourceUsage, res); if (parent != null) { - parent.decUsedResource(res); + parent.decUsedGuaranteedResource(res); } } } @@ -520,7 +523,7 @@ public Priority getDefaultApplicationPriority() { boolean fitsInMaxShare(Resource additionalResource) { Resource usagePlusAddition = - Resources.add(getResourceUsage(), additionalResource); + Resources.add(getGuaranteedResourceUsage(), additionalResource); if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) { if (LOG.isDebugEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 8ea07ab..60a6bbb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -164,7 +164,6 @@ private float reservableNodesRatio; // percentage of available nodes // an app can be reserved on - protected boolean sizeBasedWeight; // Give larger weights to larger jobs // Continuous Scheduling enabled or not protected boolean continuousSchedulingEnabled; @@ -184,6 +183,8 @@ boolean maxAssignDynamic; protected int maxAssign; // Max containers to assign per heartbeat + protected boolean oversubscriptionEnabled; + @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; @@ -957,13 +958,13 @@ private boolean shouldContinueAssigning(int containers, * resources for preempted containers. * @param node Node to check */ - static void assignPreemptedContainers(FSSchedulerNode node) { + static void attemptToAssignPreemptedResources(FSSchedulerNode node) { for (Entry entry : node.getPreemptionList().entrySet()) { FSAppAttempt app = entry.getKey(); Resource preemptionPending = Resources.clone(entry.getValue()); while (!app.isStopped() && !Resources.isNone(preemptionPending)) { - Resource assigned = app.assignContainer(node); + Resource assigned = app.assignContainer(node, false); if (Resources.isNone(assigned) || assigned.equals(FairScheduler.CONTAINER_RESERVED)) { // Fail to assign, let's not try further @@ -995,44 +996,81 @@ void attemptScheduling(FSSchedulerNode node) { // Assign new containers... // 1. Ensure containers are assigned to the apps that preempted // 2. Check for reserved applications - // 3. Schedule if there are no reservations + // 3. Schedule GUARANTEED containers if there are no reservations + // 4. Schedule OPPORTUNISTIC containers if possible // Apps may wait for preempted containers // We have to satisfy these first to avoid cases, when we preempt // a container for A from B and C gets the preempted containers, // when C does not qualify for preemption itself. - assignPreemptedContainers(node); - FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); - boolean validReservation = false; - if (reservedAppSchedulable != null) { - validReservation = reservedAppSchedulable.assignReservedContainer(node); - } + attemptToAssignPreemptedResources(node); + + boolean validReservation = attemptToAssignReservedResources(node); if (!validReservation) { - // No reservation, schedule at queue which is farthest below fair share - int assignedContainers = 0; - Resource assignedResource = Resources.clone(Resources.none()); - Resource maxResourcesToAssign = Resources.multiply( - node.getUnallocatedResource(), 0.5f); - while (node.getReservedContainer() == null) { - Resource assignment = queueMgr.getRootQueue().assignContainer(node); - if (assignment.equals(Resources.none())) { - break; - } + // only attempt to assign GUARANTEED containers if there is no + // reservation on the node because + attemptToAssignResourcesAsGuaranteedContainers(node); + } - assignedContainers++; - Resources.addTo(assignedResource, assignment); - if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign, - assignedResource)) { - break; - } - } + // attempt to assign OPPORTUNISTIC containers regardless of whether + // we have made a reservation or assigned a GUARANTEED container + if (oversubscriptionEnabled) { + attemptToAssignResourcesAsOpportunisticContainers(node); } + updateRootQueueMetrics(); } finally { writeLock.unlock(); } } + /** + * Assign the reserved resource to the application that have reserved it. + */ + private boolean attemptToAssignReservedResources(FSSchedulerNode node) { + boolean success = false; + FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); + if (reservedAppSchedulable != null) { + success = reservedAppSchedulable.assignReservedContainer(node); + } + return success; + } + + private void attemptToAssignResourcesAsGuaranteedContainers( + FSSchedulerNode node) { + // No reservation, schedule at queue which is farthest below fair share + int assignedContainers = 0; + Resource assignedResource = Resources.clone(Resources.none()); + Resource maxResourcesToAssign = Resources.multiply( + node.getUnallocatedResource(), 0.5f); + while (node.getReservedContainer() == null) { + Resource assignment = + queueMgr.getRootQueue().assignContainer(node, false); + if (assignment.equals(Resources.none())) { + break; + } + assignedContainers++; + Resources.addTo(assignedResource, assignment); + + if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign, + assignedResource)) { + break; + } + } + } + + /** + * Try to assign OPPORTUNISTIC containers as long as there is resources + * to. + * @param node the node to assign OPPORTUNISTIC containers on + */ + private void attemptToAssignResourcesAsOpportunisticContainers( + FSSchedulerNode node) { + while (!Resources.none().equals( + queueMgr.getRootQueue().assignContainer(node, true))) { + } + } + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } @@ -1272,6 +1310,7 @@ private void initScheduler(Configuration conf) throws IOException { sizeBasedWeight = this.conf.getSizeBasedWeight(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); + oversubscriptionEnabled = this.conf.isOversubscriptionEnabled(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { @@ -1639,7 +1678,7 @@ private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app, } // maxShare - if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption), + if (!Resources.fitsIn(Resources.add(cur.getGuaranteedResourceUsage(), consumption), cur.getMaxShare())) { throw new YarnException("Moving app attempt " + appAttId + " to queue " + queueName + " would violate queue maxShare constraints on" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 9c9eee6..79aab9a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -223,6 +223,11 @@ public int getContinuousSchedulingSleepMs() { return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); } + public boolean isOversubscriptionEnabled() { + return getBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + YarnConfiguration.DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED); + } + public long getLocalityDelayNodeMs() { return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index bd1ff7a..f018929 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -58,8 +58,17 @@ */ Resource getDemand(); - /** Get the aggregate amount of resources consumed by the schedulable. */ - Resource getResourceUsage(); + /** + * Get the aggregate amount of guaranteed resources consumed by the + * schedulable. + */ + Resource getGuaranteedResourceUsage(); + + /** + * Get the aggregate amount of opportunistic resources consumed by the + * schedulable. + */ + Resource getOpportunisticResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ Resource getMinShare(); @@ -89,8 +98,10 @@ /** * Assign a container on this node if possible, and return the amount of * resources assigned. + * @param node the node to assign containers on + * @param opportunistic whether to assign OPPORTUNISTIC containers or not */ - Resource assignContainer(FSSchedulerNode node); + Resource assignContainer(FSSchedulerNode node, boolean opportunistic); /** Get the fair share assigned to this Schedulable. */ Resource getFairShare(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 59635d9..f366891 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -169,8 +169,8 @@ protected int compareAttribrutes(Schedulable s1, Schedulable s2) { extends DominantResourceFairnessComparator { @Override public int compare(Schedulable s1, Schedulable s2) { - Resource usage1 = s1.getResourceUsage(); - Resource usage2 = s2.getResourceUsage(); + Resource usage1 = s1.getGuaranteedResourceUsage(); + Resource usage2 = s2.getGuaranteedResourceUsage(); Resource minShare1 = s1.getMinShare(); Resource minShare2 = s2.getMinShare(); Resource clusterCapacity = fsContext.getClusterResource(); @@ -370,9 +370,9 @@ int compareRatios(float[][] ratios1, float[][] ratios2, int index) { @Override public int compare(Schedulable s1, Schedulable s2) { ResourceInformation[] resourceInfo1 = - s1.getResourceUsage().getResources(); + s1.getGuaranteedResourceUsage().getResources(); ResourceInformation[] resourceInfo2 = - s2.getResourceUsage().getResources(); + s2.getGuaranteedResourceUsage().getResources(); ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources(); ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources(); ResourceInformation[] clusterInfo = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 8179aa7..7ecbeea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -90,8 +90,8 @@ public int compare(Schedulable s1, Schedulable s2) { int res = compareDemand(s1, s2); // Pre-compute resource usages to avoid duplicate calculation - Resource resourceUsage1 = s1.getResourceUsage(); - Resource resourceUsage2 = s2.getResourceUsage(); + Resource resourceUsage1 = s1.getGuaranteedResourceUsage(); + Resource resourceUsage2 = s2.getGuaranteedResourceUsage(); if (res == 0) { res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index 913513c..fe313a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -88,7 +88,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { amMaxResources = new ResourceInfo(Resource.newInstance( queue.getMetrics().getMaxAMShareMB(), queue.getMetrics().getMaxAMShareVCores())); - usedResources = new ResourceInfo(queue.getResourceUsage()); + usedResources = new ResourceInfo(queue.getGuaranteedResourceUsage()); demandResources = new ResourceInfo(queue.getDemand()); fractionMemUsed = (float)usedResources.getMemorySize() / clusterResources.getMemorySize(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 9325efc..2349a39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -99,7 +99,7 @@ public static Resource newAvailResource(Resource total, Resource used) { return rs; } - private static class MockRMNodeImpl implements RMNode { + public static class MockRMNodeImpl implements RMNode { private NodeId nodeId; private String hostName; private String nodeAddr; @@ -114,12 +114,26 @@ public static Resource newAvailResource(Resource total, Resource used) { private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; private Resource physicalResource; + private OverAllocationInfo overAllocationInfo; + private List containerUpdates = + Collections.EMPTY_LIST; public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, Set labels, ResourceUtilization containersUtilization, ResourceUtilization nodeUtilization, Resource pPhysicalResource) { + this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport, + lastHealthReportTime, cmdPort, hostName, state, labels, + containersUtilization, nodeUtilization, pPhysicalResource, null); + } + + public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, + Resource perNode, String rackName, String healthReport, + long lastHealthReportTime, int cmdPort, String hostName, NodeState state, + Set labels, ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource pPhysicalResource, + OverAllocationInfo overAllocationInfo) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -134,6 +148,7 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.containersUtilization = containersUtilization; this.nodeUtilization = nodeUtilization; this.physicalResource = pPhysicalResource; + this.overAllocationInfo = overAllocationInfo; } @Override @@ -221,7 +236,7 @@ public String getNodeManagerVersion() { @Override public List pullContainerUpdates() { - return new ArrayList(); + return containerUpdates; } @Override @@ -265,7 +280,7 @@ public ResourceUtilization getNodeUtilization() { @Override public OverAllocationInfo getOverAllocationInfo() { - return null; + return this.overAllocationInfo; } public OpportunisticContainersStatus getOpportunisticContainersStatus() { @@ -290,6 +305,19 @@ public Integer getDecommissioningTimeout() { public Resource getPhysicalResource() { return this.physicalResource; } + + public void updateResourceUtilization(ResourceUtilization utilization) { + this.nodeUtilization = utilization; + } + + public void updateContainersAndNodeUtilization( + UpdatedContainerInfo updatedContainerInfo, + ResourceUtilization resourceUtilization) { + if (updatedContainerInfo != null) { + containerUpdates = Collections.singletonList(updatedContainerInfo); + } + this.nodeUtilization = resourceUtilization; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, @@ -313,6 +341,15 @@ private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port, Set labels, ResourceUtilization containersUtilization, ResourceUtilization nodeUtilization, Resource physicalResource) { + return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, + labels, containersUtilization, nodeUtilization, physicalResource, null); + } + + private static MockRMNodeImpl buildRMNode(int rack, final Resource perNode, + NodeState state, String httpAddr, int hostnum, String hostName, int port, + Set labels, ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource physicalResource, + OverAllocationInfo overAllocationInfo) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -325,9 +362,8 @@ private static RMNode buildRMNode(int rack, final Resource perNode, String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, healthReport, 0, nid, hostName, state, labels, - containersUtilization, nodeUtilization, physicalResource); + containersUtilization, nodeUtilization, physicalResource, overAllocationInfo); } - public static RMNode nodeInfo(int rack, final Resource perNode, NodeState state) { return buildRMNode(rack, perNode, state, "N/A"); @@ -356,4 +392,10 @@ public static RMNode newNodeInfo(int rack, final Resource perNode, return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); } + public static MockRMNodeImpl newNodeInfo(int rack, final Resource perNode, + OverAllocationInfo overAllocationInfo) { + return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", + NODE_ID++, null, 123, null, ResourceUtilization.newInstance(0, 0, 0.0f), + ResourceUtilization.newInstance(0, 0, 0.0f), null, overAllocationInfo); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 42bf2d3..7d13e2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -483,7 +483,7 @@ private void checkFSQueue(ResourceManager rm, FSParentQueue root = scheduler.getQueueManager().getRootQueue(); // ************ check cluster used Resources ******** assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy); - assertEquals(usedResources,root.getResourceUsage()); + assertEquals(usedResources,root.getGuaranteedResourceUsage()); // ************ check app headroom **************** FSAppAttempt schedulerAttempt = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 03332b2..6adc63c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -82,7 +82,7 @@ public FakeSchedulable(Resource minShare, Resource maxShare, } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { return null; } @@ -112,11 +112,16 @@ public Priority getPriority() { } @Override - public Resource getResourceUsage() { + public Resource getGuaranteedResourceUsage() { return usage; } @Override + public Resource getOpportunisticResourceUsage() { + return Resource.newInstance(0, 0); + } + + @Override public long getStartTime() { return startTime; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java index f581935..6d0af47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java @@ -222,7 +222,7 @@ public void testMoveRunnableApp() throws Exception { scheduler.handle(nodeEvent); scheduler.handle(updateEvent); - assertEquals(Resource.newInstance(1024, 1), oldQueue.getResourceUsage()); + assertEquals(Resource.newInstance(1024, 1), oldQueue.getGuaranteedResourceUsage()); scheduler.update(); assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand()); @@ -231,8 +231,8 @@ public void testMoveRunnableApp() throws Exception { assertSame(targetQueue, app.getQueue()); assertFalse(oldQueue.isRunnableApp(app)); assertTrue(targetQueue.isRunnableApp(app)); - assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage()); - assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage()); + assertEquals(Resource.newInstance(0, 0), oldQueue.getGuaranteedResourceUsage()); + assertEquals(Resource.newInstance(1024, 1), targetQueue.getGuaranteedResourceUsage()); assertEquals(0, oldQueue.getNumRunnableApps()); assertEquals(1, targetQueue.getNumRunnableApps()); assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 46187d9..39c4d87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -223,7 +223,7 @@ public void testHeadroom() { Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources); Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare); - Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage); + Mockito.when(mockQueue.getGuaranteedResourceUsage()).thenReturn(queueUsage); Mockito.when(mockScheduler.getClusterResource()).thenReturn (clusterResource); Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn @@ -305,7 +305,7 @@ public void testHeadroomWithBlackListedNodes() { getApplicationId())); FSAppAttempt app = scheduler.getSchedulerApp(id11); assertNotNull(app); - Resource queueUsage = app.getQueue().getResourceUsage(); + Resource queueUsage = app.getQueue().getGuaranteedResourceUsage(); assertEquals(0, queueUsage.getMemorySize()); assertEquals(0, queueUsage.getVirtualCores()); SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 4a738ca..efe4ad1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -88,7 +88,7 @@ public void testUpdateDemand() { FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); - Mockito.when(app.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(Resources.none()); schedulable.addApp(app, true); schedulable.addApp(app, true); @@ -176,7 +176,7 @@ public void run() { @Override public void run() { for (int i=0; i < 500; i++) { - schedulable.getResourceUsage(); + schedulable.getGuaranteedResourceUsage(); } } }); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java index 6726f17..f79ba4c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java @@ -98,7 +98,7 @@ private FSAppAttempt createStarvingApp(FSSchedulerNode schedulerNode, ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId); - when(starvingApp.assignContainer(schedulerNode)).thenAnswer( + when(starvingApp.assignContainer(schedulerNode, false)).thenAnswer( new Answer() { @Override public Resource answer(InvocationOnMock invocationOnMock) @@ -142,7 +142,7 @@ private void finalValidation(FSSchedulerNode schedulerNode) { } private void allocateContainers(FSSchedulerNode schedulerNode) { - FairScheduler.assignPreemptedContainers(schedulerNode); + FairScheduler.attemptToAssignPreemptedResources(schedulerNode); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 42d4f81..59a5876 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -55,13 +55,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -71,6 +77,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -92,6 +100,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -1054,15 +1063,15 @@ public void testSimpleContainerAllocation() throws IOException { assertEquals( FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(2, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getVirtualCores()); + getGuaranteedResourceUsage().getVirtualCores()); // verify metrics QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1") @@ -1097,7 +1106,7 @@ public void testSimpleContainerReservation() throws Exception { // Make sure queue 1 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); @@ -1107,7 +1116,7 @@ public void testSimpleContainerReservation() throws Exception { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize()); // Now another node checks in with capacity @@ -1121,7 +1130,7 @@ public void testSimpleContainerReservation() throws Exception { // Make sure this goes to queue 2 assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // The old reservation should still be there... assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize()); @@ -1131,7 +1140,7 @@ public void testSimpleContainerReservation() throws Exception { } - @Test (timeout = 5000) + @Test public void testOffSwitchAppReservationThreshold() throws Exception { conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f); scheduler.init(conf); @@ -1171,7 +1180,7 @@ public void testOffSwitchAppReservationThreshold() throws Exception { // Verify capacity allocation assertEquals(6144, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Create new app with a resource request that can be satisfied by any // node but would be @@ -1203,7 +1212,7 @@ public void testOffSwitchAppReservationThreshold() throws Exception { scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node4)); assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); scheduler.handle(new NodeUpdateSchedulerEvent(node1)); scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -1264,7 +1273,7 @@ public void testRackLocalAppReservationThreshold() throws Exception { // Verify capacity allocation assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Create new app with a resource request that can be satisfied by any // node but would be @@ -1309,7 +1318,7 @@ public void testRackLocalAppReservationThreshold() throws Exception { scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node4)); assertEquals(10240, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); scheduler.handle(new NodeUpdateSchedulerEvent(node1)); scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -1353,7 +1362,7 @@ public void testReservationThresholdWithAssignMultiple() throws Exception { // Verify capacity allocation assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Verify number of reservations have decremented assertEquals(0, @@ -1397,7 +1406,7 @@ public void testContainerReservationAttemptExceedingQueueMax() // Make sure queue 1 is allocated app capacity assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise createSchedulingRequest(1024, "queue2", "user2", 1); @@ -1406,7 +1415,7 @@ public void testContainerReservationAttemptExceedingQueueMax() // Make sure queue 2 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); @@ -1532,7 +1541,7 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { // Make sure queue 1 is allocated app capacity assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise createSchedulingRequest(1024, "queue2", "user2", 1); @@ -1541,7 +1550,7 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { // Make sure queue 2 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); @@ -1581,12 +1590,12 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { // Make sure allocated memory of queue1 doesn't exceed its maximum assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); //the reservation of queue1 should be reclaim assertEquals(0, scheduler.getSchedulerApp(attId1). getCurrentReservation().getMemorySize()); assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); } @Test @@ -1626,7 +1635,7 @@ public void testReservationThresholdGatesReservations() throws Exception { // Make sure queue 1 is allocated app capacity assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests below threshold ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); @@ -1635,7 +1644,7 @@ public void testReservationThresholdGatesReservations() throws Exception { // Make sure queue 2 has no reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(0, scheduler.getSchedulerApp(attId).getReservedContainers().size()); @@ -1646,7 +1655,7 @@ public void testReservationThresholdGatesReservations() throws Exception { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() .getVirtualCores()); @@ -1661,7 +1670,7 @@ public void testReservationThresholdGatesReservations() throws Exception { // Make sure this goes to queue 2 assertEquals(3, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getVirtualCores()); + getGuaranteedResourceUsage().getVirtualCores()); // The old reservation should still be there... assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() @@ -2694,7 +2703,358 @@ public void testReservationWithMultiplePriorities() throws IOException { 2, liveContainers.iterator().next().getContainer(). getPriority().getPriority()); } - + + /** + * Test that NO OPPORTUNISTIC containers can be allocated on a node that + * is fully allocated and with a very high utilization. + */ + @Test + public void testAllocateNoOpportunisticContainersOnBusyNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation threshold + // of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(2048, 2), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the node's full memory + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization shoots up after the container runs on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(2000, 0, 0.8f)); + + // create another scheduling request + ApplicationAttemptId appAttempt2 + = createSchedulingRequest(100, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue("Expecting no containers allocated", + allocatedContainers2.size() == 0); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + + // verify that a reservation is made for the second resource request + Resource reserved = scheduler.getNode(node.getNodeID()). + getReservedContainer().getReservedResource(); + assertTrue("Expect a reservation made for the second resource request", + reserved.equals(Resource.newInstance(100, 1))); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test that OPPORTUNISTIC containers can be allocated on a node with low + * utilization even though there is not enough unallocated resource on the + * node to accommodate the request. + */ + @Test + public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation threshold + // of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that leaves some unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3600, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3600, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1800, 0, 0.5f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given that it's + // satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test opportunistic containers can be allocated on a node that is fully + * allocated but whose utilization is very low. + */ + @Test + public void testAllocateOpportunisticContainersOnFullyAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation threshold + // of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the whole node + ApplicationAttemptId appAttempt1 = createSchedulingRequest( + 4096, "queue1", "user1", 4); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1800, 0, 0.5f)); + + // create another scheduling request now that there is no unallocated + // resources left on the node, the request should be served with an + // allocation of an opportunistic container + ApplicationAttemptId appAttempt2 = createSchedulingRequest( + 1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given that it's + // satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test opportunistic containers can be allocated on a node with a low + * utilization even though there are GUARANTEED containers allocated. + */ + @Test + public void testAllocateOpportunisticContainersWithGuaranteedOnes() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation threshold + // of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3200, "queue1", "user1", 3); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3200, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(512, 0, 0.1f)); + + // create two other scheduling requests which in aggregate ask for more + // that what's left unallocated on the node. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(512, "queue2", "user1", 1); + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue3", "user1", 1); + + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(512, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + // verify that no reservation is made given that the second request should + // be satisfied by a GUARANTEED container allocation, the third by an + // OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made.", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + @Test public void testAclSubmitApplication() throws Exception { // Set acl's @@ -3684,7 +4044,7 @@ public void testMultipleCompletedEvent() throws Exception { .createAbnormalContainerStatus(container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.FINISHED); - assertEquals(Resources.none(), app1.getResourceUsage()); + assertEquals(Resources.none(), app1.getGuaranteedResourceUsage()); } @Test @@ -3784,7 +4144,7 @@ public void testQueueMaxAMShare() throws Exception { assertEquals("Application1's AM should be finished", 0, app1.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app1.getResourceUsage()); + Resources.none(), app1.getGuaranteedResourceUsage()); assertEquals("Application3's AM should be running", 1, app3.getLiveContainers().size()); assertEquals("Application3's AM requests 1024 MB memory", @@ -3804,7 +4164,7 @@ public void testQueueMaxAMShare() throws Exception { assertEquals("Application4's AM should not be running", 0, app4.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app4.getResourceUsage()); + Resources.none(), app4.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3820,7 +4180,7 @@ public void testQueueMaxAMShare() throws Exception { assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3833,7 +4193,7 @@ public void testQueueMaxAMShare() throws Exception { assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3849,11 +4209,11 @@ public void testQueueMaxAMShare() throws Exception { assertEquals("Application2's AM should be finished", 0, app2.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app2.getResourceUsage()); + Resources.none(), app2.getGuaranteedResourceUsage()); assertEquals("Application3's AM should be finished", 0, app3.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app3.getResourceUsage()); + Resources.none(), app3.getGuaranteedResourceUsage()); assertEquals("Application5's AM should be running", 1, app5.getLiveContainers().size()); assertEquals("Application5's AM requests 2048 MB memory", @@ -3874,7 +4234,7 @@ public void testQueueMaxAMShare() throws Exception { assertEquals("Application5's AM should have 0 container", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); scheduler.update(); @@ -3898,7 +4258,7 @@ public void testQueueMaxAMShare() throws Exception { assertEquals("Application6's AM should not be running", 0, app6.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app6.getResourceUsage()); + Resources.none(), app6.getGuaranteedResourceUsage()); assertEquals("Application6's AM resource shouldn't be updated", 0, app6.getAMResource().getMemorySize()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -4614,17 +4974,25 @@ public void testResourceUsageByMoveApp() throws Exception { FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true); FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true); - Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB); + Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2"); - Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0); + Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(), + 0); } @Test (expected = YarnException.class) @@ -4664,7 +5032,7 @@ public void testMoveWouldViolateMaxResourcesConstraints() throws Exception { scheduler.handle(updateEvent); scheduler.handle(updateEvent); - assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage()); + assertEquals(Resource.newInstance(2048, 2), oldQueue.getGuaranteedResourceUsage()); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); } @@ -5088,7 +5456,7 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception { scheduler.handle(new NodeUpdateSchedulerEvent(node2)); assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); //container will be reserved at node1 RMContainer reservedContainer1 = @@ -5108,7 +5476,7 @@ public void testContainerAllocationWithContainerIdLeap() throws Exception { app1, RMAppAttemptState.KILLED, false)); assertEquals(0, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // container will be allocated at node2 scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -5256,10 +5624,12 @@ public void testUpdateDemand() throws IOException { FSAppAttempt app1 = mock(FSAppAttempt.class); Mockito.when(app1.getDemand()).thenReturn(maxResource); - Mockito.when(app1.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app1.getGuaranteedResourceUsage()). + thenReturn(Resources.none()); FSAppAttempt app2 = mock(FSAppAttempt.class); Mockito.when(app2.getDemand()).thenReturn(maxResource); - Mockito.when(app2.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app2.getGuaranteedResourceUsage()). + thenReturn(Resources.none()); QueueManager queueManager = scheduler.getQueueManager(); FSParentQueue queue1 = queueManager.getParentQueue("queue1", true); @@ -5315,7 +5685,7 @@ public void testDumpState() throws IOException { child1.setMaxShare(new ConfigurableResource(resource)); FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(resource); - Mockito.when(app.getResourceUsage()).thenReturn(resource); + Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(resource); child1.addApp(app, true); child1.updateDemand(); @@ -5351,7 +5721,7 @@ public void testDumpState() throws IOException { + " SteadyFairShare: ," + " MaxShare: ," + " MinShare: ," - + " ResourceUsage: ," + + " Guaranteed ResourceUsage: ," + " Demand: ," + " MaxAMShare: 0.5," + " Runnable: 0}"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index b016c1b..1e4f05e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -243,11 +243,16 @@ public Resource getDemand() { } @Override - public Resource getResourceUsage() { + public Resource getGuaranteedResourceUsage() { return usage; } @Override + public Resource getOpportunisticResourceUsage() { + return Resource.newInstance(0, 0); + } + + @Override public Resource getMinShare() { return minShare; } @@ -278,7 +283,7 @@ public void updateDemand() { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) { throw new UnsupportedOperationException(); }