diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index e71ddff..b92a3d1 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -206,6 +207,11 @@ public ResourceUtilization getNodeUtilization() { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + + @Override public long getUntrackedTimeStamp() { return 0; } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 6b7ac3c..b95b58a 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -195,6 +196,11 @@ public ResourceUtilization getNodeUtilization() { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return node.getOverAllocationInfo(); + } + + @Override public long getUntrackedTimeStamp() { return 0; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index a42d053..02057a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -393,7 +393,8 @@ public RegisterNodeManagerResponse registerNodeManager( .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion, physicalResource); + resolve(host), capability, nodeManagerVersion, physicalResource, + request.getOverAllocationInfo()); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index ab15c95..dc48263 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; /** * Node managers information on available resources @@ -114,6 +115,12 @@ public ResourceUtilization getNodeUtilization(); /** + * Get the node overallocation threshold. + * @return the overallocation threshold + */ + OverAllocationInfo getOverAllocationInfo(); + + /** * the physical resources in the node. * @return the physical resources in the node. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d0bfecf..01d2ce2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; @@ -109,6 +110,7 @@ private final WriteLock writeLock; private final ConcurrentLinkedQueue nodeUpdateQueue; + private final OverAllocationInfo overallocationInfo; private volatile boolean nextHeartBeat = true; private final NodeId nodeId; @@ -364,12 +366,13 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { this(nodeId, context, hostName, cmPort, httpPort, node, capability, - nodeManagerVersion, null); + nodeManagerVersion, null, null); } public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, - String nodeManagerVersion, Resource physResource) { + String nodeManagerVersion, Resource physResource, + OverAllocationInfo overAllocationInfo) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -384,6 +387,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.nodeManagerVersion = nodeManagerVersion; this.timeStamp = 0; this.physicalResource = physResource; + this.overallocationInfo = overAllocationInfo; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -533,6 +537,11 @@ public ResourceUtilization getNodeUtilization() { } } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return this.overallocationInfo; + } + public void setNodeUtilization(ResourceUtilization nodeUtilization) { this.writeLock.lock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 5ac2ac5..d734baa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -298,37 +298,54 @@ public ContainerId matchContainerToOutstandingIncreaseReq( /** * Swaps the existing RMContainer's and the temp RMContainers internal - * container references after adjusting the resources in each. + * container references after adjusting the resources in each. This + * assumes the two containers are allocated on the same node. * @param tempRMContainer Temp RMContainer. * @param existingRMContainer Existing RMContainer. + * @param node the node on which the containers to be swapped are allocated * @param updateType Update Type. * @return Existing RMContainer after swapping the container references. */ public RMContainer swapContainer(RMContainer tempRMContainer, - RMContainer existingRMContainer, ContainerUpdateType updateType) { + RMContainer existingRMContainer, SchedulerNode node, + ContainerUpdateType updateType) { + assert existingRMContainer.getNodeId().equals(node.getNodeID()); + assert tempRMContainer.getNodeId().equals(node.getNodeID()); + ContainerId matchedContainerId = existingRMContainer.getContainerId(); // Swap updated container with the existing container Container tempContainer = tempRMContainer.getContainer(); + Container existingContainer = existingRMContainer.getContainer(); Resource updatedResource = createUpdatedResource( - tempContainer, existingRMContainer.getContainer(), updateType); + tempContainer, existingContainer, updateType); Resource resourceToRelease = createResourceToRelease( - existingRMContainer.getContainer(), updateType); + existingContainer, updateType); Container newContainer = Container.newInstance(matchedContainerId, - existingRMContainer.getContainer().getNodeId(), - existingRMContainer.getContainer().getNodeHttpAddress(), + existingContainer.getNodeId(), + existingContainer.getNodeHttpAddress(), updatedResource, - existingRMContainer.getContainer().getPriority(), null, + existingContainer.getPriority(), null, tempContainer.getExecutionType()); newContainer.setAllocationRequestId( - existingRMContainer.getContainer().getAllocationRequestId()); - newContainer.setVersion(existingRMContainer.getContainer().getVersion()); + existingContainer.getAllocationRequestId()); + newContainer.setVersion(existingContainer.getVersion()); - tempRMContainer.getContainer().setResource(resourceToRelease); - tempRMContainer.getContainer().setExecutionType( - existingRMContainer.getContainer().getExecutionType()); + Container updatedTempContainer = Container.newInstance( + tempContainer.getId(), tempContainer.getNodeId(), + tempContainer.getNodeHttpAddress(), + resourceToRelease, tempContainer.getPriority(), + tempContainer.getContainerToken(), + existingContainer.getExecutionType()); ((RMContainerImpl)existingRMContainer).setContainer(newContainer); + // notify schedulerNode of the update to correct resource accounting + node.containerUpdated(existingRMContainer, existingContainer); + + ((RMContainerImpl)tempRMContainer).setContainer(updatedTempContainer); + // notify SchedulerNode of the update to correct resource accounting + node.containerUpdated(tempRMContainer, tempContainer); + return existingRMContainer; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index db63cd8..4afaddd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -849,8 +849,10 @@ protected synchronized void addToNewlyAllocatedContainers( getRMContainer(matchedContainerId); if (existingRMContainer != null) { // swap containers + SchedulerNode node = rmContext.getScheduler().getSchedulerNode( + tempRMContainer.getNodeId()); existingRMContainer = getUpdateContext().swapContainer( - tempRMContainer, existingRMContainer, updateTpe); + tempRMContainer, existingRMContainer, node, updateTpe); getUpdateContext().removeFromOutstandingUpdate( tempRMContainer.getAllocatedSchedulerKey(), existingRMContainer.getContainer()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 05dbf1e..ef80105 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -57,30 +57,38 @@ private static final Log LOG = LogFactory.getLog(SchedulerNode.class); + private Resource capacity; private Resource unallocatedResource = Resource.newInstance(0, 0); - private Resource allocatedResource = Resource.newInstance(0, 0); - private Resource totalResource; + private RMContainer reservedContainer; - private volatile int numContainers; private volatile ResourceUtilization containersUtilization = ResourceUtilization.newInstance(0, 0, 0f); private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap<>(); + private final Map + allocatedContainers = new HashMap<>(); + + private volatile int numGuaranteedContainers = 0; + private Resource allocatedResourceGuaranteed = Resource.newInstance(0, 0); + + private volatile int numOpportunisticContainers = 0; + private Resource allocatedResourceOpportunistic = Resource.newInstance(0, 0); private final RMNode rmNode; private final String nodeName; + // The total amount of resources requested by containers that have been + // allocated but not yet launched on the node. + private Resource resourceAllocatedPendingLaunch = Resource.newInstance(0, 0); + private volatile Set labels = null; public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; this.unallocatedResource = Resources.clone(node.getTotalCapability()); - this.totalResource = Resources.clone(node.getTotalCapability()); + this.capacity = Resources.clone(node.getTotalCapability()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -102,9 +110,9 @@ public RMNode getRMNode() { * @param resource Total resources on the node. */ public synchronized void updateTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + this.capacity = Resources.clone(resource); + this.unallocatedResource = Resources.subtract(capacity, + this.allocatedResourceGuaranteed); } /** @@ -163,15 +171,61 @@ public void allocateContainer(RMContainer rmContainer) { protected synchronized void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { Container container = rmContainer.getContainer(); - if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { - deductUnallocatedResource(container.getResource()); - ++numContainers; + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceAllocated( + container.getResource()); + numGuaranteedContainers++; + } else { + opportunisticContainerResourceAllocated( + container.getResource()); + numOpportunisticContainers++; } - - launchedContainers.put(container.getId(), + allocatedContainers.put(container.getId(), new ContainerInfo(rmContainer, launchedOnNode)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " and type " + + container.getExecutionType() + " on host " + toString()); + } + } + + /** + * Update resource bookkeeping upon an allocation of a GUARANTEED + * container. + * @param resource resource of the allocated GUARANTEED container. + */ + @VisibleForTesting + public synchronized void guaranteedContainerResourceAllocated( + Resource resource) { + if (containerResourceAllocated(resource, allocatedResourceGuaranteed)) { + Resources.subtractFrom(unallocatedResource, resource); + } + } + + private boolean containerResourceAllocated(Resource allocated, + Resource aggregatedResources) { + if (allocated == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return false; + } + Resources.addTo(resourceAllocatedPendingLaunch, allocated); + Resources.addTo(aggregatedResources, allocated); + return true; + } + + /** + * Updates resource bookkeeping upon an allocation of an OPPORTUNISTIC + * container. + * @param resource resource of the allocated OPPORTUNISTIC container. + */ + public synchronized void opportunisticContainerResourceAllocated( + Resource resource) { + containerResourceAllocated(resource, allocatedResourceOpportunistic); } + /** * Get unallocated resources on the node. * @return Unallocated resources on the node @@ -185,7 +239,7 @@ public synchronized Resource getUnallocatedResource() { * @return Allocated resources on the node */ public synchronized Resource getAllocatedResource() { - return this.allocatedResource; + return this.allocatedResourceGuaranteed; } /** @@ -193,30 +247,28 @@ public synchronized Resource getAllocatedResource() { * @return Total resources on the node. */ public synchronized Resource getTotalResource() { - return this.totalResource; + return this.capacity; } /** - * Check if a container is launched by this node. + * Check if a GUARANTEED container is launched by this node. * @return If the container is launched by the node. */ - public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; - } - return false; + @VisibleForTesting + public synchronized boolean isValidGuaranteedContainer( + ContainerId containerId) { + return allocatedContainers.containsKey(containerId); } /** - * Update the resources of the node when releasing a container. - * @param container Container to release. + * Check if an OPPORTUNISTIC container is launched by this node. + * @param containerId id of the container to check + * @return If the container is launched by the node. */ - protected synchronized void updateResourceForReleasedContainer( - Container container) { - if (container.getExecutionType() == ExecutionType.GUARANTEED) { - addUnallocatedResource(container.getResource()); - --numContainers; - } + @VisibleForTesting + public synchronized boolean isValidOpportunisticContainer( + ContainerId containerId) { + return allocatedContainers.containsKey(containerId); } /** @@ -226,25 +278,36 @@ protected synchronized void updateResourceForReleasedContainer( */ public synchronized void releaseContainer(ContainerId containerId, boolean releasedByNode) { - ContainerInfo info = launchedContainers.get(containerId); - if (info == null) { + RMContainer rmContainer = getContainer(containerId); + if (rmContainer == null) { + LOG.warn("Invalid container " + containerId + " is released."); return; } + + ContainerInfo info = allocatedContainers.get(containerId); + // ony process if the container has not been launched on a node + // yet or it is released by node. if (!releasedByNode && info.launchedOnNode) { - // wait until node reports container has completed return; } - launchedContainers.remove(containerId); - Container container = info.container.getContainer(); - updateResourceForReleasedContainer(container); + Container container = rmContainer.getContainer(); + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceReleased(container); + allocatedContainers.remove(containerId); + numGuaranteedContainers--; + } else { + opportunisticContainerResourceReleased(container); + numOpportunisticContainers--; + allocatedContainers.remove(containerId); + } if (LOG.isDebugEnabled()) { - LOG.debug("Released container " + container.getId() + " of capacity " - + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which currently has " + numContainers + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available" + ", release resources=" + true); + if (LOG.isDebugEnabled()) { + LOG.debug("Released " + container.getExecutionType() + " container " + + containerId + " of " + "capacity " + container.getResource() + + " on node (" + toString() + ")" + ", release resources=" + true); + } } } @@ -253,41 +316,81 @@ public synchronized void releaseContainer(ContainerId containerId, * @param containerId ID of the launched container */ public synchronized void containerStarted(ContainerId containerId) { - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { + ContainerInfo info = allocatedContainers.get(containerId); + if (info != null && !info.launchedOnNode) { info.launchedOnNode = true; + Resources.subtractFrom(resourceAllocatedPendingLaunch, + info.container.getContainer().getResource()); } } /** - * Add unallocated resources to the node. This is used when unallocating a - * container. - * @param resource Resources to add. + * Update the resources of the node when releasing a GUARANTEED container. + * @param container Container to release. */ - private synchronized void addUnallocatedResource(Resource resource) { - if (resource == null) { + protected synchronized void guaranteedContainerResourceReleased( + Container container) { + assert container.getExecutionType() == ExecutionType.GUARANTEED; + + if (containerResourceReleased(container.getResource(), + allocatedResourceGuaranteed)) { + Resources.addTo(unallocatedResource, container.getResource()); + } + } + + private boolean containerResourceReleased(Resource released, + Resource aggregatedResource) { + if (released == null) { LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); - return; + return false; } - Resources.addTo(unallocatedResource, resource); - Resources.subtractFrom(allocatedResource, resource); + Resources.subtractFrom(aggregatedResource, released); + return true; } /** - * Deduct unallocated resources from the node. This is used when allocating a - * container. - * @param resource Resources to deduct. + * Update the resources of the node when releasing an OPPORTUNISTIC container. + * @param container Container to release. */ - @VisibleForTesting - public synchronized void deductUnallocatedResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; + private void opportunisticContainerResourceReleased( + Container container) { + assert container.getExecutionType() == ExecutionType.OPPORTUNISTIC; + + containerResourceReleased(container.getResource(), + allocatedResourceOpportunistic); + } + + /** + * Update a container that has been previously launched on the node. + * Container update (increase/decrease/promote/demote) is implemented + * by swapping the underlying container instance, so we need to call + * this method to update the resource accounting. + * @param rmContainer the rm container that has the new information + * @param oldContainer the container that has the previous information + */ + public synchronized void containerUpdated(RMContainer rmContainer, + Container oldContainer) { + assert rmContainer.getContainerId().equals(oldContainer.getId()); + + if (allocatedContainers.containsKey(oldContainer.getId())) { + if (oldContainer.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceReleased(oldContainer); + numGuaranteedContainers--; + } else { + opportunisticContainerResourceReleased(oldContainer); + numOpportunisticContainers--; + } + + Container newContainer = rmContainer.getContainer(); + if (newContainer.getExecutionType() == ExecutionType.GUARANTEED) { + guaranteedContainerResourceAllocated(newContainer.getResource()); + numGuaranteedContainers++; + } else { + opportunisticContainerResourceAllocated(newContainer.getResource()); + numOpportunisticContainers++; + } } - Resources.subtractFrom(unallocatedResource, resource); - Resources.addTo(allocatedResource, resource); } /** @@ -307,17 +410,26 @@ public abstract void reserveResource(SchedulerApplicationAttempt attempt, @Override public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" - + getNumContainers() + " available=" + getUnallocatedResource() - + " used=" + getAllocatedResource(); + return "host: " + rmNode.getNodeAddress() + " #guaranteed containers=" + + getNumGuaranteedContainers() + " #opportunistic containers=" + + getNumOpportunisticContainers() + " available=" + + getUnallocatedResource() + " used=" + getAllocatedResource(); } /** - * Get number of active containers on the node. - * @return Number of active containers on the node. + * Get number of active GUARANTEED containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. */ - public int getNumContainers() { - return numContainers; + public int getNumGuaranteedContainers() { + return numGuaranteedContainers; + } + + /** + * Get number of active OPPORTUNISTIC containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. + */ + public int getNumOpportunisticContainers() { + return numOpportunisticContainers; } /** @@ -325,8 +437,8 @@ public int getNumContainers() { * @return A copy of containers running on the node. */ public synchronized List getCopiedListOfRunningContainers() { - List result = new ArrayList<>(launchedContainers.size()); - for (ContainerInfo info : launchedContainers.values()) { + List result = new ArrayList<>(allocatedContainers.size()); + for (ContainerInfo info : allocatedContainers.values()) { result.add(info.container); } return result; @@ -336,12 +448,14 @@ public int getNumContainers() { * Get the containers running on the node with AM containers at the end. * @return A copy of running containers with AM containers at the end. */ - public synchronized List getRunningContainersWithAMsAtTheEnd() { + public synchronized List + getRunningGuaranteedContainersWithAMsAtTheEnd() { LinkedList result = new LinkedList<>(); - for (ContainerInfo info : launchedContainers.values()) { + for (ContainerInfo info : allocatedContainers.values()) { if(info.container.isAMContainer()) { result.addLast(info.container); - } else { + } else if (info.container.getExecutionType() == + ExecutionType.GUARANTEED){ result.addFirst(info.container); } } @@ -354,12 +468,9 @@ public int getNumContainers() { * @return The container for the specified container ID */ protected synchronized RMContainer getContainer(ContainerId containerId) { - RMContainer container = null; - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { - container = info.container; - } - return container; + ContainerInfo info = allocatedContainers.get(containerId); + + return info != null ? info.container : null; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java index fa71a25..ea30d78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java @@ -31,11 +31,11 @@ private final Resource used; private final Resource avail; private final int num; - + public SchedulerNodeReport(SchedulerNode node) { this.used = node.getAllocatedResource(); this.avail = node.getUnallocatedResource(); - this.num = node.getNumContainers(); + this.num = node.getNumGuaranteedContainers(); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 7277779..911a057 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -144,9 +144,9 @@ public synchronized void markContainerToNonKillable(ContainerId containerId) { } @Override - protected synchronized void updateResourceForReleasedContainer( + protected synchronized void guaranteedContainerResourceReleased( Container container) { - super.updateResourceForReleasedContainer(container); + super.guaranteedContainerResourceReleased(container); if (killableContainers.containsKey(container.getId())) { Resources.subtractFrom(totalKillableResources, container.getResource()); killableContainers.remove(container.getId()); @@ -168,7 +168,7 @@ protected synchronized void allocateContainer(RMContainer rmContainer, final Container container = rmContainer.getContainer(); LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " + + ", which has " + getNumGuaranteedContainers() + " containers, " + getAllocatedResource() + " used and " + getUnallocatedResource() + " available after allocation"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index b3e59c5..8327a79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -159,7 +159,7 @@ private PreemptableContainers identifyContainersToPreemptOnNode( // Figure out list of containers to consider List containersToCheck = - node.getRunningContainersWithAMsAtTheEnd(); + node.getRunningGuaranteedContainersWithAMsAtTheEnd(); containersToCheck.removeAll(node.getContainersForPreemption()); // Initialize potential with unallocated but not reserved resources diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 44ec9c3..e8460ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -244,7 +244,7 @@ protected synchronized void allocateContainer(RMContainer rmContainer, final Container container = rmContainer.getContainer(); LOG.debug("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " + + ", which has " + getNumGuaranteedContainers() + " containers, " + getAllocatedResource() + " used and " + getUnallocatedResource() + " available after allocation"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 611c7f2..9325efc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -262,6 +263,11 @@ public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + public OpportunisticContainersStatus getOpportunisticContainersStatus() { return OpportunisticContainersStatus.newInstance(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index c1cb4c1..b0da5df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -217,13 +217,14 @@ public void testSchedulerRecovery() throws Exception { Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue(schedulerNode1.isValidContainer(runningContainer - .getContainerId())); - assertFalse(schedulerNode1.isValidContainer(completedContainer - .getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + runningContainer.getContainerId())); + assertFalse(schedulerNode1.isValidGuaranteedContainer( + completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); @@ -360,13 +361,14 @@ public void testDynamicQueueRecovery() throws Exception { Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue( - schedulerNode1.isValidContainer(runningContainer.getContainerId())); - assertFalse( - schedulerNode1.isValidContainer(completedContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer( + runningContainer.getContainerId())); + assertFalse(schedulerNode1.isValidGuaranteedContainer( + completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 0c3130d..f6cb769 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -218,8 +218,8 @@ public void testCommitProposalForFailedAppAttempt() // nm1 runs 1 container(app1-container_01/AM) // nm2 runs 1 container(app1-container_02) - Assert.assertEquals(1, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(1, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // kill app attempt1 scheduler.handle( @@ -314,8 +314,8 @@ public void testCommitOutdatedReservedProposal() throws Exception { // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, // app2-container_01/AM) // nm2 runs 1 container(app1-container_03) - Assert.assertEquals(3, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(3, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // reserve 1 container(app1-container_04) for app1 on nm1 ResourceRequest rr2 = ResourceRequest diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef33..c56be29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -563,7 +563,7 @@ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId, int numContainers) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); SchedulerNode node = cs.getSchedulerNode(nodeId); - Assert.assertEquals(numContainers, node.getNumContainers()); + Assert.assertEquals(numContainers, node.getNumGuaranteedContainers()); } /** @@ -1065,7 +1065,7 @@ public RMNodeLabelsManager createNodeLabelManager() { for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - if (schedulerNode1.getNumContainers() == 0) { + if (schedulerNode1.getNumGuaranteedContainers() == 0) { cycleWaited++; } } @@ -1131,7 +1131,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG + nodeIdStr + " ( Partition : [x]")); Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId()) - .getNumContainers()); + .getNumGuaranteedContainers()); rm1.close(); } @@ -1215,7 +1215,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x - Assert.assertEquals(10, schedulerNode1.getNumContainers()); + Assert.assertEquals(10, schedulerNode1.getNumGuaranteedContainers()); // check non-exclusive containers of LeafQueue is correctly updated LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); @@ -1943,7 +1943,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x - Assert.assertEquals(5, schedulerNode1.getNumContainers()); + Assert.assertEquals(5, schedulerNode1.getNumGuaranteedContainers()); SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); @@ -2043,7 +2043,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x (non-exclusive) - Assert.assertEquals(3, schedulerNode1.getNumContainers()); + Assert.assertEquals(3, schedulerNode1.getNumGuaranteedContainers()); SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); @@ -2074,7 +2074,7 @@ public RMNodeLabelsManager createNodeLabelManager() { cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); // app1 gets all resource in default partition - Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(2, schedulerNode2.getNumGuaranteedContainers()); // 3GB is used from label x quota. 2GB used from default label. // So total 2.5 GB is remaining. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 854a65c..5d31d4e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -319,7 +319,8 @@ public void run() { for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) { int i = ThreadLocalRandom.current().nextInt(-30, 30); synchronized (scheduler) { - node.deductUnallocatedResource(Resource.newInstance(i * 1024, i)); + node.guaranteedContainerResourceAllocated( + Resource.newInstance(i * 1024, i)); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 6b2109d..cce2715 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3291,12 +3291,16 @@ private void testAMStrictLocality(boolean node, boolean invalid) scheduler.handle(node2UpdateEvent); if (invalid) { assertEquals(0, app.getLiveContainers().size()); - assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumContainers()); - assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + assertEquals(0, + scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers()); + assertEquals(0, + scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers()); } else { assertEquals(1, app.getLiveContainers().size()); - assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumContainers()); - assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + assertEquals(1, + scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers()); + assertEquals(0, + scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers()); } }