diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index e71ddff..b92a3d1 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -206,6 +207,11 @@ public ResourceUtilization getNodeUtilization() { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + + @Override public long getUntrackedTimeStamp() { return 0; } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 6b7ac3c..b95b58a 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -195,6 +196,11 @@ public ResourceUtilization getNodeUtilization() { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return node.getOverAllocationInfo(); + } + + @Override public long getUntrackedTimeStamp() { return 0; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index a42d053..02057a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -393,7 +393,8 @@ public RegisterNodeManagerResponse registerNodeManager( .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion, physicalResource); + resolve(host), capability, nodeManagerVersion, physicalResource, + request.getOverAllocationInfo()); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java index 320f262..25777af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java @@ -51,7 +51,7 @@ public static TempSchedulerNode fromSchedulerNode( FiCaSchedulerNode schedulerNode) { TempSchedulerNode n = new TempSchedulerNode(); - n.totalResource = Resources.clone(schedulerNode.getTotalResource()); + n.totalResource = Resources.clone(schedulerNode.getCapacity()); n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource()); n.runningContainers = schedulerNode.getCopiedListOfRunningContainers(); n.reservedContainer = schedulerNode.getReservedContainer(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index ab15c95..dc48263 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; /** * Node managers information on available resources @@ -114,6 +115,12 @@ public ResourceUtilization getNodeUtilization(); /** + * Get the node overallocation threshold. + * @return the overallocation threshold + */ + OverAllocationInfo getOverAllocationInfo(); + + /** * the physical resources in the node. * @return the physical resources in the node. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d0bfecf..01d2ce2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; @@ -109,6 +110,7 @@ private final WriteLock writeLock; private final ConcurrentLinkedQueue nodeUpdateQueue; + private final OverAllocationInfo overallocationInfo; private volatile boolean nextHeartBeat = true; private final NodeId nodeId; @@ -364,12 +366,13 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { this(nodeId, context, hostName, cmPort, httpPort, node, capability, - nodeManagerVersion, null); + nodeManagerVersion, null, null); } public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, - String nodeManagerVersion, Resource physResource) { + String nodeManagerVersion, Resource physResource, + OverAllocationInfo overAllocationInfo) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -384,6 +387,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.nodeManagerVersion = nodeManagerVersion; this.timeStamp = 0; this.physicalResource = physResource; + this.overallocationInfo = overAllocationInfo; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -533,6 +537,11 @@ public ResourceUtilization getNodeUtilization() { } } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return this.overallocationInfo; + } + public void setNodeUtilization(ResourceUtilization nodeUtilization) { this.writeLock.lock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index f2da1fe..6b6cb23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -340,7 +340,7 @@ protected void containerLaunchedOnNode( } application.containerLaunchedOnNode(containerId, node.getNodeID()); - node.containerStarted(containerId); + node.containerLaunched(containerId); } finally { readLock.unlock(); } @@ -804,7 +804,7 @@ public void updateNodeResource(RMNode nm, writeLock.lock(); SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); - Resource oldResource = node.getTotalResource(); + Resource oldResource = node.getCapacity(); if (!oldResource.equals(newResource)) { // Notify NodeLabelsManager about this change rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index ccec6bc..0c06fe1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -81,7 +81,7 @@ public void addNode(N node) { nodesList.add(node); // Update cluster capacity - Resources.addTo(clusterCapacity, node.getTotalResource()); + Resources.addTo(clusterCapacity, node.getCapacity()); // Update maximumAllocation updateMaxResources(node, true); @@ -174,7 +174,7 @@ public N removeNode(NodeId nodeId) { } // Update cluster capacity - Resources.subtractFrom(clusterCapacity, node.getTotalResource()); + Resources.subtractFrom(clusterCapacity, node.getCapacity()); // Update maximumAllocation updateMaxResources(node, false); @@ -234,7 +234,7 @@ public Resource getMaxAllowedAllocation() { } private void updateMaxResources(SchedulerNode node, boolean add) { - Resource totalResource = node.getTotalResource(); + Resource totalResource = node.getCapacity(); writeLock.lock(); try { if (add) { // added node diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 5ac2ac5..4abb2d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -298,37 +298,56 @@ public ContainerId matchContainerToOutstandingIncreaseReq( /** * Swaps the existing RMContainer's and the temp RMContainers internal - * container references after adjusting the resources in each. + * container references after adjusting the resources in each. This + * assumes the two containers are allocated on the same node. * @param tempRMContainer Temp RMContainer. * @param existingRMContainer Existing RMContainer. + * @param node the node on which the containers to be swapped are allocated * @param updateType Update Type. * @return Existing RMContainer after swapping the container references. */ public RMContainer swapContainer(RMContainer tempRMContainer, - RMContainer existingRMContainer, ContainerUpdateType updateType) { + RMContainer existingRMContainer, SchedulerNode node, + ContainerUpdateType updateType) { + assert existingRMContainer.getNodeId().equals(node.getNodeID()); + assert tempRMContainer.getNodeId().equals(node.getNodeID()); + ContainerId matchedContainerId = existingRMContainer.getContainerId(); // Swap updated container with the existing container Container tempContainer = tempRMContainer.getContainer(); + Container existingContainer = existingRMContainer.getContainer(); Resource updatedResource = createUpdatedResource( - tempContainer, existingRMContainer.getContainer(), updateType); + tempContainer, existingContainer, updateType); Resource resourceToRelease = createResourceToRelease( - existingRMContainer.getContainer(), updateType); + existingContainer, updateType); Container newContainer = Container.newInstance(matchedContainerId, - existingRMContainer.getContainer().getNodeId(), - existingRMContainer.getContainer().getNodeHttpAddress(), + existingContainer.getNodeId(), + existingContainer.getNodeHttpAddress(), updatedResource, - existingRMContainer.getContainer().getPriority(), null, + existingContainer.getPriority(), null, tempContainer.getExecutionType()); newContainer.setAllocationRequestId( - existingRMContainer.getContainer().getAllocationRequestId()); - newContainer.setVersion(existingRMContainer.getContainer().getVersion()); + existingContainer.getAllocationRequestId()); + newContainer.setVersion(existingContainer.getVersion()); + + Container updatedTempContainer = Container.newInstance( + tempContainer.getId(), tempContainer.getNodeId(), + tempContainer.getNodeHttpAddress(), + resourceToRelease, tempContainer.getPriority(), + tempContainer.getContainerToken(), + existingContainer.getExecutionType()); - tempRMContainer.getContainer().setResource(resourceToRelease); - tempRMContainer.getContainer().setExecutionType( - existingRMContainer.getContainer().getExecutionType()); + synchronized (node) { + ((RMContainerImpl) existingRMContainer).setContainer(newContainer); + // notify schedulerNode of the update to correct resource accounting + node.containerUpdated(existingRMContainer, existingContainer); + + ((RMContainerImpl) tempRMContainer).setContainer(updatedTempContainer); + // notify SchedulerNode of the update to correct resource accounting + node.containerUpdated(tempRMContainer, tempContainer); + } - ((RMContainerImpl)existingRMContainer).setContainer(newContainer); return existingRMContainer; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index db63cd8..4afaddd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -849,8 +849,10 @@ protected synchronized void addToNewlyAllocatedContainers( getRMContainer(matchedContainerId); if (existingRMContainer != null) { // swap containers + SchedulerNode node = rmContext.getScheduler().getSchedulerNode( + tempRMContainer.getNodeId()); existingRMContainer = getUpdateContext().swapContainer( - tempRMContainer, existingRMContainer, updateTpe); + tempRMContainer, existingRMContainer, node, updateTpe); getUpdateContext().removeFromOutstandingUpdate( tempRMContainer.getAllocatedSchedulerKey(), existingRMContainer.getContainer()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 05dbf1e..ad12e33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -57,30 +57,39 @@ private static final Log LOG = LogFactory.getLog(SchedulerNode.class); + private Resource capacity; private Resource unallocatedResource = Resource.newInstance(0, 0); - private Resource allocatedResource = Resource.newInstance(0, 0); - private Resource totalResource; + private RMContainer reservedContainer; - private volatile int numContainers; private volatile ResourceUtilization containersUtilization = ResourceUtilization.newInstance(0, 0, 0f); private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap<>(); + private final Map + allocatedContainers = new HashMap<>(); + + private volatile int numGuaranteedContainers = 0; + private Resource allocatedResourceGuaranteed = Resource.newInstance(0, 0); + + private volatile int numOpportunisticContainers = 0; + private Resource allocatedResourceOpportunistic = Resource.newInstance(0, 0); private final RMNode rmNode; private final String nodeName; + // The total amount of resources requested by containers that have been + // allocated but not yet launched on the node. + protected Resource resourceAllocatedPendingLaunch = + Resource.newInstance(0, 0); + private volatile Set labels = null; public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; this.unallocatedResource = Resources.clone(node.getTotalCapability()); - this.totalResource = Resources.clone(node.getTotalCapability()); + this.capacity = Resources.clone(node.getTotalCapability()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -102,9 +111,9 @@ public RMNode getRMNode() { * @param resource Total resources on the node. */ public synchronized void updateTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + this.capacity = Resources.clone(resource); + this.unallocatedResource = Resources.subtract(capacity, + this.allocatedResourceGuaranteed); } /** @@ -163,17 +172,64 @@ public void allocateContainer(RMContainer rmContainer) { protected synchronized void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { Container container = rmContainer.getContainer(); - if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { - deductUnallocatedResource(container.getResource()); - ++numContainers; - } - launchedContainers.put(container.getId(), + allocatedContainers.put(container.getId(), new ContainerInfo(rmContainer, launchedOnNode)); + + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceAllocated( + container.getResource()); + numGuaranteedContainers++; + } else { + opportunisticContainerResourceAllocated( + container.getResource()); + numOpportunisticContainers++; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " and type " + + container.getExecutionType() + " on host " + toString()); + } + } + + /** + * Update resource bookkeeping upon an allocation of a GUARANTEED + * container. + * @param resource resource of the allocated GUARANTEED container. + */ + @VisibleForTesting + public synchronized void guaranteedContainerResourceAllocated( + Resource resource) { + if (containerResourceAllocated(resource, allocatedResourceGuaranteed)) { + Resources.subtractFrom(unallocatedResource, resource); + } + } + + /** + * Updates resource bookkeeping upon an allocation of an OPPORTUNISTIC + * container. + * @param resource resource of the allocated OPPORTUNISTIC container. + */ + private void opportunisticContainerResourceAllocated( + Resource resource) { + containerResourceAllocated(resource, allocatedResourceOpportunistic); + } + + private boolean containerResourceAllocated(Resource allocated, + Resource aggregatedResources) { + if (allocated == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return false; + } + Resources.addTo(resourceAllocatedPendingLaunch, allocated); + Resources.addTo(aggregatedResources, allocated); + return true; } + /** - * Get unallocated resources on the node. + * Get resources that are not allocated to GUARANTEED containers on the node. * @return Unallocated resources on the node */ public synchronized Resource getUnallocatedResource() { @@ -181,42 +237,57 @@ public synchronized Resource getUnallocatedResource() { } /** - * Get allocated resources on the node. - * @return Allocated resources on the node + * Get resources allocated to GUARANTEED containers on the node. + * @return Allocated resources to GUARANTEED containers on the node */ public synchronized Resource getAllocatedResource() { - return this.allocatedResource; + return this.allocatedResourceGuaranteed; + } + + /** + * Get resources allocated to OPPORTUNISTIC containers on the node. + * @return Allocated resources to OPPORTUNISTIC containers on the node + */ + public synchronized Resource getOpportunisticResourceAllocated() { + return this.allocatedResourceOpportunistic; + } + + @VisibleForTesting + public synchronized Resource getResourceAllocatedPendingLaunch() { + return this.resourceAllocatedPendingLaunch; } /** * Get total resources on the node. * @return Total resources on the node. */ - public synchronized Resource getTotalResource() { - return this.totalResource; + public synchronized Resource getCapacity() { + return this.capacity; } /** - * Check if a container is launched by this node. + * Check if a GUARANTEED container is launched by this node. * @return If the container is launched by the node. */ - public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; - } - return false; + @VisibleForTesting + public synchronized boolean isValidGuaranteedContainer( + ContainerId containerId) { + ContainerInfo containerInfo = allocatedContainers.get(containerId); + return containerInfo != null && ExecutionType.GUARANTEED == + containerInfo.container.getExecutionType(); } /** - * Update the resources of the node when releasing a container. - * @param container Container to release. + * Check if an OPPORTUNISTIC container is launched by this node. + * @param containerId id of the container to check + * @return If the container is launched by the node. */ - protected synchronized void updateResourceForReleasedContainer( - Container container) { - if (container.getExecutionType() == ExecutionType.GUARANTEED) { - addUnallocatedResource(container.getResource()); - --numContainers; - } + @VisibleForTesting + public synchronized boolean isValidOpportunisticContainer( + ContainerId containerId) { + ContainerInfo containerInfo = allocatedContainers.get(containerId); + return containerInfo != null && ExecutionType.OPPORTUNISTIC == + containerInfo.container.getExecutionType(); } /** @@ -226,25 +297,48 @@ protected synchronized void updateResourceForReleasedContainer( */ public synchronized void releaseContainer(ContainerId containerId, boolean releasedByNode) { - ContainerInfo info = launchedContainers.get(containerId); - if (info == null) { + RMContainer rmContainer = getContainer(containerId); + if (rmContainer == null) { + LOG.warn("Invalid container " + containerId + " is released."); return; } - if (!releasedByNode && info.launchedOnNode) { - // wait until node reports container has completed + + if (!allocatedContainers.containsKey(containerId)) { + // do not process if the container is never allocated on the node + return; + } + if (!releasedByNode && + allocatedContainers.get(containerId).launchedOnNode) { + // only process if the container has not been launched on a node + // yet or it is released by node. return; } - launchedContainers.remove(containerId); - Container container = info.container.getContainer(); - updateResourceForReleasedContainer(container); + Container container = rmContainer.getContainer(); + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceReleased(container); + // do not update allocated containers until the resources of the + // container are released because we need to check if we need + // to update resourceAllocatedPendingLaunch in case the container + // has not been launched on the node. + allocatedContainers.remove(containerId); + numGuaranteedContainers--; + } else { + opportunisticContainerResourceReleased(container); + // do not update allocated containers until the resources of the + // container are released because we need to check if we need + // to update resourceAllocatedPendingLaunch in case the container + // has not been launched on the node. + allocatedContainers.remove(containerId); + numOpportunisticContainers--; + } if (LOG.isDebugEnabled()) { - LOG.debug("Released container " + container.getId() + " of capacity " - + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which currently has " + numContainers + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available" + ", release resources=" + true); + if (LOG.isDebugEnabled()) { + LOG.debug("Released " + container.getExecutionType() + " container " + + containerId + " of " + "capacity " + container.getResource() + + " on node (" + toString() + ")" + ", release resources=" + true); + } } } @@ -252,42 +346,87 @@ public synchronized void releaseContainer(ContainerId containerId, * Inform the node that a container has launched. * @param containerId ID of the launched container */ - public synchronized void containerStarted(ContainerId containerId) { - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { + public synchronized void containerLaunched(ContainerId containerId) { + ContainerInfo info = allocatedContainers.get(containerId); + if (info != null && !info.launchedOnNode) { info.launchedOnNode = true; + Resources.subtractFrom(resourceAllocatedPendingLaunch, + info.container.getContainer().getResource()); } } /** - * Add unallocated resources to the node. This is used when unallocating a - * container. - * @param resource Resources to add. + * Update the resources of the node when releasing a GUARANTEED container. + * @param container Container to release. + */ + protected synchronized void guaranteedContainerResourceReleased( + Container container) { + assert container.getExecutionType() == ExecutionType.GUARANTEED; + + if (containerResourceReleased(container, allocatedResourceGuaranteed)) { + Resources.addTo(unallocatedResource, container.getResource()); + } + } + + /** + * Update the resources of the node when releasing an OPPORTUNISTIC container. + * @param container Container to release. */ - private synchronized void addUnallocatedResource(Resource resource) { - if (resource == null) { + private void opportunisticContainerResourceReleased( + Container container) { + assert container.getExecutionType() == ExecutionType.OPPORTUNISTIC; + + containerResourceReleased(container, allocatedResourceOpportunistic); + } + + private boolean containerResourceReleased(Container container, + Resource aggregatedResource) { + Resource released = container.getResource(); + if (released == null) { LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); - return; + return false; + } + Resources.subtractFrom(aggregatedResource, released); + + if (!allocatedContainers.get(container.getId()).launchedOnNode) { + // update resourceAllocatedPendingLaunch if the container is has + // not yet been launched on the node + Resources.subtractFrom(resourceAllocatedPendingLaunch, released); } - Resources.addTo(unallocatedResource, resource); - Resources.subtractFrom(allocatedResource, resource); + return true; } /** - * Deduct unallocated resources from the node. This is used when allocating a - * container. - * @param resource Resources to deduct. + * Update a container that has been previously launched on the node. + * Container update (increase/decrease/promote/demote) is implemented + * by swapping the underlying container instance, so we need to call + * this method to update the resource accounting. + * @param rmContainer the rm container that has the new information + * @param oldContainer the container that has the previous information */ - @VisibleForTesting - public synchronized void deductUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; + public synchronized void containerUpdated(RMContainer rmContainer, + Container oldContainer) { + assert rmContainer.getContainerId().equals(oldContainer.getId()); + + if (allocatedContainers.containsKey(oldContainer.getId())) { + if (oldContainer.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceReleased(oldContainer); + numGuaranteedContainers--; + } else { + opportunisticContainerResourceReleased(oldContainer); + numOpportunisticContainers--; + } + + Container newContainer = rmContainer.getContainer(); + if (newContainer.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceAllocated(newContainer.getResource()); + numGuaranteedContainers++; + } else { + opportunisticContainerResourceAllocated(newContainer.getResource()); + numOpportunisticContainers++; + } } - Resources.subtractFrom(unallocatedResource, resource); - Resources.addTo(allocatedResource, resource); } /** @@ -307,17 +446,28 @@ public abstract void reserveResource(SchedulerApplicationAttempt attempt, @Override public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" - + getNumContainers() + " available=" + getUnallocatedResource() - + " used=" + getAllocatedResource(); + return "host: " + rmNode.getNodeAddress() + " #guaranteed containers=" + + getNumGuaranteedContainers() + " #opportunistic containers=" + + getNumOpportunisticContainers() + " available=" + + getUnallocatedResource() + " used by guaranteed containers=" + + allocatedResourceGuaranteed + " used by opportunistic containers=" + + allocatedResourceOpportunistic; + } + + /** + * Get number of active GUARANTEED containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. + */ + public int getNumGuaranteedContainers() { + return numGuaranteedContainers; } /** - * Get number of active containers on the node. - * @return Number of active containers on the node. + * Get number of active OPPORTUNISTIC containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. */ - public int getNumContainers() { - return numContainers; + public int getNumOpportunisticContainers() { + return numOpportunisticContainers; } /** @@ -325,8 +475,8 @@ public int getNumContainers() { * @return A copy of containers running on the node. */ public synchronized List getCopiedListOfRunningContainers() { - List result = new ArrayList<>(launchedContainers.size()); - for (ContainerInfo info : launchedContainers.values()) { + List result = new ArrayList<>(allocatedContainers.size()); + for (ContainerInfo info : allocatedContainers.values()) { result.add(info.container); } return result; @@ -336,12 +486,14 @@ public int getNumContainers() { * Get the containers running on the node with AM containers at the end. * @return A copy of running containers with AM containers at the end. */ - public synchronized List getRunningContainersWithAMsAtTheEnd() { + public synchronized List + getRunningGuaranteedContainersWithAMsAtTheEnd() { LinkedList result = new LinkedList<>(); - for (ContainerInfo info : launchedContainers.values()) { + for (ContainerInfo info : allocatedContainers.values()) { if(info.container.isAMContainer()) { result.addLast(info.container); - } else { + } else if (info.container.getExecutionType() == + ExecutionType.GUARANTEED){ result.addFirst(info.container); } } @@ -354,12 +506,9 @@ public int getNumContainers() { * @return The container for the specified container ID */ protected synchronized RMContainer getContainer(ContainerId containerId) { - RMContainer container = null; - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { - container = info.container; - } - return container; + ContainerInfo info = allocatedContainers.get(containerId); + + return info != null ? info.container : null; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java index fa71a25..ea30d78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java @@ -31,11 +31,11 @@ private final Resource used; private final Resource avail; private final int num; - + public SchedulerNodeReport(SchedulerNode node) { this.used = node.getAllocatedResource(); this.avail = node.getUnallocatedResource(); - this.num = node.getNumContainers(); + this.num = node.getNumGuaranteedContainers(); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d91aa55..1fb49a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1600,7 +1600,7 @@ private void addNode(RMNode nodeManager) { // update this node to node label manager if (labelManager != null) { labelManager.activateNode(nodeManager.getNodeID(), - schedulerNode.getTotalResource()); + schedulerNode.getCapacity()); } Resource clusterResource = getClusterResource(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 72dfbdd..c591f3a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -500,13 +500,13 @@ private ContainerAllocation assignContainer(Resource clusterResource, Resource capability = pendingAsk.getPerAllocationResource(); Resource available = node.getUnallocatedResource(); - Resource totalResource = node.getTotalResource(); + Resource totalResource = node.getCapacity(); if (!Resources.lessThanOrEqual(rc, clusterResource, capability, totalResource)) { LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for ask : " + pendingAsk - + " node total capability : " + node.getTotalResource()); + + " node total capability : " + node.getCapacity()); // Skip this locality request ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a12c5ec..9e75a71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -952,7 +952,7 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { diagnosticMessageBldr.append(" ( Partition : "); diagnosticMessageBldr.append(node.getLabels()); diagnosticMessageBldr.append(", Total resource : "); - diagnosticMessageBldr.append(node.getTotalResource()); + diagnosticMessageBldr.append(node.getCapacity()); diagnosticMessageBldr.append(", Available resource : "); diagnosticMessageBldr.append(node.getUnallocatedResource()); diagnosticMessageBldr.append(" )."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 7277779..eca4cf5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -144,9 +144,9 @@ public synchronized void markContainerToNonKillable(ContainerId containerId) { } @Override - protected synchronized void updateResourceForReleasedContainer( + protected synchronized void guaranteedContainerResourceReleased( Container container) { - super.updateResourceForReleasedContainer(container); + super.guaranteedContainerResourceReleased(container); if (killableContainers.containsKey(container.getId())) { Resources.subtractFrom(totalKillableResources, container.getResource()); killableContainers.remove(container.getId()); @@ -168,9 +168,10 @@ protected synchronized void allocateContainer(RMContainer rmContainer, final Container container = rmContainer.getContainer(); LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available after allocation"); + + ", which has " + getNumGuaranteedContainers() + " guaranteed" + + " containers using " + getAllocatedResource() + ", " + + getNumOpportunisticContainers() + " opportunistic containers" + + " using " + getOpportunisticResourceAllocated()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index b3e59c5..8327a79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -159,7 +159,7 @@ private PreemptableContainers identifyContainersToPreemptOnNode( // Figure out list of containers to consider List containersToCheck = - node.getRunningContainersWithAMsAtTheEnd(); + node.getRunningGuaranteedContainersWithAMsAtTheEnd(); containersToCheck.removeAll(node.getContainersForPreemption()); // Initialize potential with unallocated but not reserved resources diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 44ec9c3..95490f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -242,11 +242,12 @@ protected synchronized void allocateContainer(RMContainer rmContainer, super.allocateContainer(rmContainer, launchedOnNode); if (LOG.isDebugEnabled()) { final Container container = rmContainer.getContainer(); - LOG.debug("Assigned container " + container.getId() + " of capacity " + LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available after allocation"); + + ", which has " + getNumGuaranteedContainers() + " guaranteed " + + "containers using " + getAllocatedResource() + ", " + + getNumOpportunisticContainers() + " opportunistic containers " + + "using " + getOpportunisticResourceAllocated()); } Resource allocated = rmContainer.getAllocatedResource(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 611c7f2..9325efc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -262,6 +263,11 @@ public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + public OpportunisticContainersStatus getOpportunisticContainersStatus() { return OpportunisticContainersStatus.newInstance(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index c1cb4c1..b0da5df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -217,13 +217,14 @@ public void testSchedulerRecovery() throws Exception { Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue(schedulerNode1.isValidContainer(runningContainer - .getContainerId())); - assertFalse(schedulerNode1.isValidContainer(completedContainer - .getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + runningContainer.getContainerId())); + assertFalse(schedulerNode1.isValidGuaranteedContainer( + completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); @@ -360,13 +361,14 @@ public void testDynamicQueueRecovery() throws Exception { Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue( - schedulerNode1.isValidContainer(runningContainer.getContainerId())); - assertFalse( - schedulerNode1.isValidContainer(completedContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + runningContainer.getContainerId())); + assertFalse(schedulerNode1.isValidGuaranteedContainer( + completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 4fc0ea4..8a5238b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -474,7 +474,7 @@ private void mockSchedulerNodes(String schedulerNodesConfigStr) totalRes = parseResourceFromString(resSring); } } - when(sn.getTotalResource()).thenReturn(totalRes); + when(sn.getCapacity()).thenReturn(totalRes); when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes)); // TODO, add settings of killable resources when necessary diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java index 964a230..89fb846 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java @@ -237,17 +237,17 @@ public void testBuilderWithSpecifiedNodeResources() throws Exception { // Check host resources Assert.assertEquals(3, this.cs.getAllNodes().size()); SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1)); - Assert.assertEquals(100, node1.getTotalResource().getMemorySize()); + Assert.assertEquals(100, node1.getCapacity().getMemorySize()); Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size()); Assert.assertNull(node1.getReservedContainer()); SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1)); - Assert.assertEquals(0, node2.getTotalResource().getMemorySize()); + Assert.assertEquals(0, node2.getCapacity().getMemorySize()); Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size()); Assert.assertNotNull(node2.getReservedContainer()); SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1)); - Assert.assertEquals(30, node3.getTotalResource().getMemorySize()); + Assert.assertEquals(30, node3.getCapacity().getMemorySize()); Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size()); Assert.assertNull(node3.getReservedContainer()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 979e68a..ef5a0c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -289,12 +289,12 @@ public void testUpdateMaxAllocationUsesTotal() throws IOException { SchedulerNode mockNode1 = mock(SchedulerNode.class); when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080)); when(mockNode1.getUnallocatedResource()).thenReturn(emptyResource); - when(mockNode1.getTotalResource()).thenReturn(fullResource1); + when(mockNode1.getCapacity()).thenReturn(fullResource1); SchedulerNode mockNode2 = mock(SchedulerNode.class); when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081)); when(mockNode2.getUnallocatedResource()).thenReturn(emptyResource); - when(mockNode2.getTotalResource()).thenReturn(fullResource2); + when(mockNode2.getCapacity()).thenReturn(fullResource2); verifyMaximumResourceCapability(configuredMaximumResource, scheduler); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java new file mode 100644 index 0000000..9ec41bf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java @@ -0,0 +1,602 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Matchers.shortThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for SchedulerNode. + */ +public class TestSchedulerNode { + private final Resource nodeCapacity = Resource.newInstance(1024*10, 4); + + @Test + public void testAllocateAndReleaseGuaranteedContainer() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // launch the container on the node + schedulerNode.containerLaunched(containerId); + + Assert.assertEquals("The container should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, true); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + } + + @Test + public void testAllocateAndReleaseOpportunisticContainer() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertEquals("Incorrect remaining resource accounted.", + nodeCapacity, schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumOpportunisticContainers()); + Assert.assertTrue( + schedulerNode.isValidOpportunisticContainer(containerId)); + + // launch the container on the node + schedulerNode.containerLaunched(containerId); + + Assert.assertEquals("The container should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, true); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumOpportunisticContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertFalse("The container should have been released", + schedulerNode.isValidOpportunisticContainer(containerId)); + } + + @Test + public void testAllocateAndReleaseContainers() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + + Resource guaranteedResource = Resource.newInstance(4096, 1); + RMContainer guaranteedContainer = + createRMContainer(0, guaranteedResource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId guaranteedContainerId = guaranteedContainer.getContainerId(); + + // allocate a guaranteed container on the node + schedulerNode.allocateContainer(guaranteedContainer); + + Assert.assertEquals("The guaranteed container should have been allocated", + guaranteedResource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, guaranteedResource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The guaranteed container should have been allocated" + + " but not launched", guaranteedResource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(guaranteedContainerId)); + + Resource opportunisticResource = Resource.newInstance(8192, 4); + RMContainer opportunisticContainer = + createRMContainer(1, opportunisticResource, + ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode()); + ContainerId opportunisticContainerId = opportunisticContainer.getContainerId(); + + // allocate an opportunistic container on the node + schedulerNode.allocateContainer(opportunisticContainer); + + Assert.assertEquals("The opportunistic container should have been" + + " allocated", opportunisticResource, + schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, guaranteedResource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The opportunistic container should also have been" + + " allocated but not launched", + Resources.add(guaranteedResource, opportunisticResource), + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumOpportunisticContainers()); + Assert.assertTrue( + schedulerNode.isValidOpportunisticContainer(opportunisticContainerId)); + + // launch both containers on the node + schedulerNode.containerLaunched(guaranteedContainerId); + schedulerNode.containerLaunched(opportunisticContainerId); + + Assert.assertEquals("Both containers should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release both containers + schedulerNode.releaseContainer(guaranteedContainerId, true); + schedulerNode.releaseContainer(opportunisticContainerId, true); + + Assert.assertEquals("The guaranteed container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The opportunistic container should have been released", + 0, schedulerNode.getNumOpportunisticContainers()); + Assert.assertEquals("The guaranteed container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + Assert.assertEquals("The opportunistic container should have been released", + Resources.none(), schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertFalse("The guaranteed container should have been released", + schedulerNode.isValidGuaranteedContainer(guaranteedContainerId)); + Assert.assertFalse("The opportunistic container should have been released", + schedulerNode.isValidOpportunisticContainer(opportunisticContainerId)); + } + + @Test + public void testReleaseLaunchedContainerNotAsNode() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // launch the container on the node + schedulerNode.containerLaunched(containerId); + + Assert.assertEquals("The container should have been launched", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, false); + Assert.assertEquals("The container should not have been released", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should not have been released", + resource, schedulerNode.getAllocatedResource()); + Assert.assertTrue("The container should not have been released", + schedulerNode.isValidGuaranteedContainer(containerId)); + } + + @Test + public void testReleaseUnlaunchedContainerAsNode() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", + resource, schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // make sure the container is not launched yet + Assert.assertEquals("The container should not be launched already", + resource, schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, true); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + Assert.assertFalse("The container should have been released", + schedulerNode.isValidGuaranteedContainer(containerId)); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + } + + @Test + public void testReleaseUnlaunchedContainerNotAsNode() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + Resource resource = Resource.newInstance(4096, 1); + RMContainer container = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + ContainerId containerId = container.getContainerId(); + + // allocate a container on the node + schedulerNode.allocateContainer(container); + + Assert.assertEquals("The container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(containerId)); + + // make sure the container is not launched yet + Assert.assertEquals("The container should not have been launched", + resource, schedulerNode.getResourceAllocatedPendingLaunch()); + + // release the container + schedulerNode.releaseContainer(containerId, false); + Assert.assertEquals("The container should have been released", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + Assert.assertFalse("The container should have been released", + schedulerNode.isValidGuaranteedContainer(containerId)); + Assert.assertEquals("The container should have been released", + Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch()); + } + + @Test + public void testContainerUpdateOfResourceIncrease() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + + Resource resource = Resource.newInstance(4096, 1); + RMContainerImpl rmContainer = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + Container oldContainer = rmContainer.getContainer(); + + // allocate the container on the node + schedulerNode.allocateContainer(rmContainer); + + Assert.assertEquals("The guaranteed container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The guaranteed container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The guaranteed container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(rmContainer.getContainerId())); + + // simulate a container update event of resource increase + Resource increase = Resource.newInstance(1024, 1); + Resource increasedResource = Resources.add(resource, increase); + Container newContainer = createContainer( + oldContainer.getId().getContainerId(), increasedResource, + oldContainer.getExecutionType(), schedulerNode.getRMNode()); + rmContainer.setContainer(newContainer); + + // notify the node of the container increase + schedulerNode.containerUpdated(rmContainer, oldContainer); + + Assert.assertEquals("Resource of the guaranteed container should increase", + increasedResource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, increasedResource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The resources that are allocated but not launched" + + " should have increased" , increasedResource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The guaranteed container should still be allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(rmContainer.getContainerId())); + } + + @Test + public void testContainerUpdateOfResourceDecrease() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + + Resource resource = Resource.newInstance(4096, 4); + RMContainerImpl rmContainer = createRMContainer(0, resource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + Container oldContainer = rmContainer.getContainer(); + + // allocate the container on the node + schedulerNode.allocateContainer(rmContainer); + + Assert.assertEquals("The guaranteed container should have been allocated", + resource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, resource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The guaranteed container should have been allocated" + + " but not launched", resource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The guaranteed container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(rmContainer.getContainerId())); + + // simulate a container update event of resource decrease + Resource decreasedResource = + Resources.add(resource, Resource.newInstance(1024, 1)); + Container newContainer = createContainer( + oldContainer.getId().getContainerId(), decreasedResource, + oldContainer.getExecutionType(), schedulerNode.getRMNode()); + rmContainer.setContainer(newContainer); + + // notify the node of the container resource decrease event + schedulerNode.containerUpdated(rmContainer, oldContainer); + + Assert.assertEquals("Resource of the guaranteed container should decrease", + decreasedResource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, decreasedResource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The resources that are allocated but not launched" + + " should have decreased" , decreasedResource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The guaranteed container should still be allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(rmContainer.getContainerId())); + } + + @Test + public void testContainerUpdateOfPromotion() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + + Resource opportunisticResource = Resource.newInstance(4096, 4); + RMContainerImpl rmContainer = createRMContainer(0, opportunisticResource, + ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode()); + Container oldContainer = rmContainer.getContainer(); + + // allocate the container on the node + schedulerNode.allocateContainer(rmContainer); + + Assert.assertEquals("The opportunistic container should have" + + " been allocated", opportunisticResource, + schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertEquals("Incorrect remaining resource accounted.", + nodeCapacity, schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The opportunistic container should have" + + " been allocated but not launched", opportunisticResource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The opportunistic container should have been allocated", + 1, schedulerNode.getNumOpportunisticContainers()); + Assert.assertTrue(schedulerNode.isValidOpportunisticContainer( + rmContainer.getContainerId())); + + // simulate a container update event of promotion + // (ExecutionType.OPPORTUNISTIC -> Execution.GUARANTEED) + Container newContainer = createContainer( + oldContainer.getId().getContainerId(), oldContainer.getResource(), + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + rmContainer.setContainer(newContainer); + + // notify the node of the container promotion event + schedulerNode.containerUpdated(rmContainer, oldContainer); + + Assert.assertEquals("The opportunistic container should have been promoted", + newContainer.getResource(), schedulerNode.getAllocatedResource()); + Assert.assertEquals("The opportunistic container should have been promoted", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue("The opportunistic container should have been promoted", + schedulerNode.isValidGuaranteedContainer(rmContainer.getContainerId())); + Assert.assertEquals("The opportunistic container should have been promoted", + Resources.none(), schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertEquals("The opportunistic container should have been promoted", + 0, schedulerNode.getNumOpportunisticContainers()); + Assert.assertFalse( + schedulerNode.isValidOpportunisticContainer(rmContainer.getContainerId())); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, newContainer.getResource()), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals(oldContainer.getResource(), + schedulerNode.getResourceAllocatedPendingLaunch()); + } + + @Test + public void testContainerUpdateOfDemotion() { + SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity); + + Resource guaranteedResource = Resource.newInstance(4096, 4); + RMContainerImpl rmContainer = createRMContainer(0, guaranteedResource, + ExecutionType.GUARANTEED, schedulerNode.getRMNode()); + Container oldContainer = rmContainer.getContainer(); + + // allocate the container on the node + schedulerNode.allocateContainer(rmContainer); + + Assert.assertEquals("The guaranteed container should have been allocated", + guaranteedResource, schedulerNode.getAllocatedResource()); + Assert.assertEquals("Incorrect remaining resource accounted.", + Resources.subtract(nodeCapacity, guaranteedResource), + schedulerNode.getUnallocatedResource()); + Assert.assertEquals("The guaranteed container should have been allocated" + + " but not launched", guaranteedResource, + schedulerNode.getResourceAllocatedPendingLaunch()); + Assert.assertEquals("The guaranteed container should have been allocated", + 1, schedulerNode.getNumGuaranteedContainers()); + Assert.assertTrue( + schedulerNode.isValidGuaranteedContainer(rmContainer.getContainerId())); + + // simulate a container update event of demotion + // (ExecutionType.GUARANTEED -> Execution.OPPORTUNISTIC) + Container newContainer = createContainer( + oldContainer.getId().getContainerId(), oldContainer.getResource(), + ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode()); + rmContainer.setContainer(newContainer); + + // notify the node of the container demotion event + schedulerNode.containerUpdated(rmContainer, oldContainer); + + Assert.assertEquals("The guaranteed container should have been demoted", + Resources.none(), schedulerNode.getAllocatedResource()); + Assert.assertEquals("The guaranteed container should have been demoted", + 0, schedulerNode.getNumGuaranteedContainers()); + Assert.assertFalse("The guaranteed container should have been demoted", + schedulerNode.isValidGuaranteedContainer(rmContainer.getContainerId())); + Assert.assertEquals("The guaranteed container should have been demoted", + newContainer.getResource(), + schedulerNode.getOpportunisticResourceAllocated()); + Assert.assertEquals("The guaranteed container should have been demoted", + 1, schedulerNode.getNumOpportunisticContainers()); + Assert.assertTrue( + schedulerNode.isValidOpportunisticContainer(rmContainer.getContainerId())); + Assert.assertEquals("Incorrect remaining resource accounted.", + nodeCapacity, schedulerNode.getUnallocatedResource()); + Assert.assertEquals(oldContainer.getResource(), + schedulerNode.getResourceAllocatedPendingLaunch()); + } + + private SchedulerNode createSchedulerNode(Resource capacity) { + NodeId nodeId = NodeId.newInstance("localhost", 0); + + RMNode rmNode = mock(RMNode.class); + when(rmNode.getNodeID()).thenReturn(nodeId); + when(rmNode.getHostName()).thenReturn(nodeId.getHost()); + when(rmNode.getTotalCapability()).thenReturn(capacity); + when(rmNode.getRackName()).thenReturn("/default"); + when(rmNode.getHttpAddress()).thenReturn(nodeId.getHost()); + when(rmNode.getNodeAddress()).thenReturn(nodeId.getHost()); + + return new SchedulerNodeForTest(rmNode); + } + + private static RMContainerImpl createRMContainer(long containerId, + Resource resource, ExecutionType executionType, RMNode node) { + Container container = + createContainer(containerId, resource, executionType, node); + + Dispatcher dispatcher = new AsyncDispatcher(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(dispatcher); + when(rmContext.getSystemMetricsPublisher()). + thenReturn(new NoOpSystemMetricPublisher()); + when(rmContext.getYarnConfiguration()). + thenReturn(new YarnConfiguration()); + when(rmContext.getContainerAllocationExpirer()). + thenReturn(new ContainerAllocationExpirer(dispatcher)); + when(rmContext.getRMApplicationHistoryWriter()). + thenReturn(new RMApplicationHistoryWriter()); + + return new RMContainerImpl(container, null, + container.getId().getApplicationAttemptId(), + node.getNodeID(), "test", rmContext); + } + + private static Container createContainer(long containerId, Resource resource, + ExecutionType executionType, RMNode node) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId. + newInstance(ApplicationId.newInstance(0, 0), 0); + ContainerId cId = + ContainerId.newContainerId(appAttemptId, containerId); + Container container = Container.newInstance( + cId, node.getNodeID(), node.getNodeAddress(), resource, + Priority.newInstance(0), null, executionType); + return container; + } + + + /** + * A test implementation of SchedulerNode for the purpose of testing + * SchedulerNode only. Resource reservation is scheduler-dependent, + * and therefore not covered here. + */ + private static final class SchedulerNodeForTest extends SchedulerNode + { + public SchedulerNodeForTest(RMNode node) { + super(node, false); + } + + @Override + public void reserveResource(SchedulerApplicationAttempt attempt, + SchedulerRequestKey schedulerKey, RMContainer container) { + } + + @Override + public void unreserveResource(SchedulerApplicationAttempt attempt) { + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1dea4ee..fe50eb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -4154,7 +4154,7 @@ public void handle(Event event) { // Check total resource of scheduler node is also changed to 1 GB 1 core Resource totalResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + .getSchedulerNode(nm_0.getNodeId()).getCapacity(); Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB, totalResource.getMemorySize()); Assert.assertEquals("Total Resource Virtual Cores should be 1", 1, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 0c3130d..f6cb769 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -218,8 +218,8 @@ public void testCommitProposalForFailedAppAttempt() // nm1 runs 1 container(app1-container_01/AM) // nm2 runs 1 container(app1-container_02) - Assert.assertEquals(1, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(1, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // kill app attempt1 scheduler.handle( @@ -314,8 +314,8 @@ public void testCommitOutdatedReservedProposal() throws Exception { // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, // app2-container_01/AM) // nm2 runs 1 container(app1-container_03) - Assert.assertEquals(3, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(3, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // reserve 1 container(app1-container_04) for app1 on nm1 ResourceRequest rr2 = ResourceRequest diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index a32352b..70398e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -373,7 +373,7 @@ public void testSingleQueueOneUserMetrics() throws Exception { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals( - (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB), + (int)(node_0.getCapacity().getMemorySize() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); } @@ -668,7 +668,7 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, a.getMetrics().getReservedMB()); assertEquals(0*GB, a.getMetrics().getAllocatedMB()); - assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()), + assertEquals((int)(a.getCapacity() * node_0.getCapacity().getMemorySize()), a.getMetrics().getAvailableMB()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef33..c56be29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -563,7 +563,7 @@ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId, int numContainers) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); SchedulerNode node = cs.getSchedulerNode(nodeId); - Assert.assertEquals(numContainers, node.getNumContainers()); + Assert.assertEquals(numContainers, node.getNumGuaranteedContainers()); } /** @@ -1065,7 +1065,7 @@ public RMNodeLabelsManager createNodeLabelManager() { for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - if (schedulerNode1.getNumContainers() == 0) { + if (schedulerNode1.getNumGuaranteedContainers() == 0) { cycleWaited++; } } @@ -1131,7 +1131,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG + nodeIdStr + " ( Partition : [x]")); Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId()) - .getNumContainers()); + .getNumGuaranteedContainers()); rm1.close(); } @@ -1215,7 +1215,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x - Assert.assertEquals(10, schedulerNode1.getNumContainers()); + Assert.assertEquals(10, schedulerNode1.getNumGuaranteedContainers()); // check non-exclusive containers of LeafQueue is correctly updated LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); @@ -1943,7 +1943,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x - Assert.assertEquals(5, schedulerNode1.getNumContainers()); + Assert.assertEquals(5, schedulerNode1.getNumGuaranteedContainers()); SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); @@ -2043,7 +2043,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x (non-exclusive) - Assert.assertEquals(3, schedulerNode1.getNumContainers()); + Assert.assertEquals(3, schedulerNode1.getNumGuaranteedContainers()); SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); @@ -2074,7 +2074,7 @@ public RMNodeLabelsManager createNodeLabelManager() { cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); // app1 gets all resource in default partition - Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(2, schedulerNode2.getNumGuaranteedContainers()); // 3GB is used from label x quota. 2GB used from default label. // So total 2.5 GB is remaining. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 854a65c..5d31d4e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -319,7 +319,8 @@ public void run() { for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) { int i = ThreadLocalRandom.current().nextInt(-30, 30); synchronized (scheduler) { - node.deductUnallocatedResource(Resource.newInstance(i * 1024, i)); + node.guaranteedContainerResourceAllocated( + Resource.newInstance(i * 1024, i)); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java index 0e3d344..6726f17 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java @@ -87,7 +87,7 @@ private void saturateCluster(FSSchedulerNode schedulerNode) { while (!Resources.isNone(schedulerNode.getUnallocatedResource())) { createDefaultContainer(); schedulerNode.allocateContainer(containers.get(containers.size() - 1)); - schedulerNode.containerStarted(containers.get(containers.size() - 1). + schedulerNode.containerLaunched(containers.get(containers.size() - 1). getContainerId()); } } @@ -183,9 +183,9 @@ public void testMultipleAllocations() { assertEquals("Nothing should have been allocated, yet", Resources.none(), schedulerNode.getAllocatedResource()); schedulerNode.allocateContainer(containers.get(0)); - schedulerNode.containerStarted(containers.get(0).getContainerId()); + schedulerNode.containerLaunched(containers.get(0).getContainerId()); schedulerNode.allocateContainer(containers.get(1)); - schedulerNode.containerStarted(containers.get(1).getContainerId()); + schedulerNode.containerLaunched(containers.get(1).getContainerId()); schedulerNode.allocateContainer(containers.get(2)); assertEquals("Container should be allocated", Resources.multiply(containers.get(0).getContainer().getResource(), 3.0), @@ -225,7 +225,7 @@ public void testSimplePreemption() { schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); allocateContainers(schedulerNode); assertEquals("Container should be allocated", - schedulerNode.getTotalResource(), + schedulerNode.getCapacity(), schedulerNode.getAllocatedResource()); // Release all remaining containers @@ -266,7 +266,7 @@ public void testDuplicatePreemption() { schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); allocateContainers(schedulerNode); assertEquals("Container should be allocated", - schedulerNode.getTotalResource(), + schedulerNode.getCapacity(), schedulerNode.getAllocatedResource()); // Release all remaining containers @@ -312,7 +312,7 @@ public void testComplexPreemption() { allocateContainers(schedulerNode); assertEquals("Container should be allocated", - schedulerNode.getTotalResource(), + schedulerNode.getCapacity(), schedulerNode.getAllocatedResource()); // Release all containers @@ -360,7 +360,7 @@ public void testMultiplePreemptionEvents() { allocateContainers(schedulerNode); assertEquals("Container should be allocated", - schedulerNode.getTotalResource(), + schedulerNode.getCapacity(), schedulerNode.getAllocatedResource()); // Release all containers @@ -399,7 +399,7 @@ public void testPreemptionToCompletedApp() { when(starvingApp.isStopped()).thenReturn(true); allocateContainers(schedulerNode); assertNotEquals("Container should be allocated", - schedulerNode.getTotalResource(), + schedulerNode.getCapacity(), schedulerNode.getAllocatedResource()); // Release all containers @@ -437,7 +437,7 @@ public void testPartialReservedPreemption() { // Container partially reassigned allocateContainers(schedulerNode); assertEquals("Container should be allocated", - Resources.subtract(schedulerNode.getTotalResource(), + Resources.subtract(schedulerNode.getCapacity(), Resource.newInstance(512, 0)), schedulerNode.getAllocatedResource()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 6b2109d..e1f155e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3291,12 +3291,16 @@ private void testAMStrictLocality(boolean node, boolean invalid) scheduler.handle(node2UpdateEvent); if (invalid) { assertEquals(0, app.getLiveContainers().size()); - assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumContainers()); - assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + assertEquals(0, + scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers()); + assertEquals(0, + scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers()); } else { assertEquals(1, app.getLiveContainers().size()); - assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumContainers()); - assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + assertEquals(1, + scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers()); + assertEquals(0, + scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers()); } } @@ -4984,7 +4988,7 @@ public void handle(Event event) { // Check total resource of scheduler node is also changed to 0 GB 0 core Resource totalResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + .getSchedulerNode(nm_0.getNodeId()).getCapacity(); Assert.assertEquals(totalResource.getMemorySize(), 0 * GB); Assert.assertEquals(totalResource.getVirtualCores(), 0); // Check the available resource is 0/0 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 3f97b59..63da0c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -340,7 +340,7 @@ public void testUpdateResourceOnNode() throws Exception { // SchedulerNode's total resource and available resource are changed. assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()) - .getTotalResource().getMemorySize()); + .getCapacity().getMemorySize()); assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()). getUnallocatedResource().getMemorySize(), 1024); QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); @@ -1266,7 +1266,7 @@ public void handle(Event event) { // Check total resource of scheduler node is also changed to 1 GB 1 core Resource totalResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + .getSchedulerNode(nm_0.getNodeId()).getCapacity(); Assert.assertEquals(totalResource.getMemorySize(), 1 * GB); Assert.assertEquals(totalResource.getVirtualCores(), 1); // Check the available resource is 0/0