diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 8962aba..25910ca 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -206,6 +207,11 @@ public ResourceUtilization getNodeUtilization() { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + + @Override public long getUntrackedTimeStamp() { return 0; } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index d7b159c..ddd316a 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -195,6 +196,11 @@ public ResourceUtilization getNodeUtilization() { } @Override + public OverAllocationInfo getOverAllocationInfo() { + return node.getOverAllocationInfo(); + } + + @Override public long getUntrackedTimeStamp() { return 0; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index aa7f524..433e8d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -387,7 +387,8 @@ public RegisterNodeManagerResponse registerNodeManager( .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion, physicalResource); + resolve(host), capability, nodeManagerVersion, physicalResource, + request.getOverAllocationInfo()); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 29680e5..2d2489d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -97,6 +97,8 @@ ExecutionType getExecutionType(); + void setExecutionType(ExecutionType type); + /** * If the container was allocated by a container other than the Resource * Manager (e.g., the distributed scheduler in the NM diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 1e9463a..7f9d1e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -836,6 +836,11 @@ public ExecutionType getExecutionType() { } @Override + public void setExecutionType(ExecutionType type) { + container.setExecutionType(type); + } + + @Override public boolean isRemotelyAllocated() { return isExternallyAllocated; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 86f8679..ac01462 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; /** * Node managers information on available resources @@ -114,6 +115,12 @@ public ResourceUtilization getNodeUtilization(); /** + * the overallocation threshold. + * @return overallocation threshold + */ + OverAllocationInfo getOverAllocationInfo(); + + /** * the physical resources in the node. * @return the physical resources in the node. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1f121f8..c45e210 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; @@ -108,6 +109,7 @@ private final WriteLock writeLock; private final ConcurrentLinkedQueue nodeUpdateQueue; + private final OverAllocationInfo overallocationInfo; private volatile boolean nextHeartBeat = true; private final NodeId nodeId; @@ -359,12 +361,13 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { this(nodeId, context, hostName, cmPort, httpPort, node, capability, - nodeManagerVersion, null); + nodeManagerVersion, null, null); } public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, - String nodeManagerVersion, Resource physResource) { + String nodeManagerVersion, Resource physResource, + OverAllocationInfo overAllocationInfo) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -379,6 +382,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.nodeManagerVersion = nodeManagerVersion; this.timeStamp = 0; this.physicalResource = physResource; + this.overallocationInfo = overAllocationInfo; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -528,6 +532,11 @@ public ResourceUtilization getNodeUtilization() { } } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return this.overallocationInfo; + } + public void setNodeUtilization(ResourceUtilization nodeUtilization) { this.writeLock.lock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 79caab0..b1e3366 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -288,7 +288,7 @@ protected void containerLaunchedOnNode( } application.containerLaunchedOnNode(containerId, node.getNodeID()); - node.containerStarted(containerId); + node.containerLaunched(containerId); } finally { readLock.unlock(); } @@ -513,6 +513,7 @@ public void recoverContainersOnNode(List containerReports, private RMContainer recoverAndCreateContainer(NMContainerStatus status, RMNode node) { + // TODO may need to add ExecutionType in NMContainerStatus and recover Container container = Container.newInstance(status.getContainerId(), node.getNodeID(), node.getHttpAddress(), status.getAllocatedResource(), @@ -1056,9 +1057,11 @@ protected void nodeUpdate(RMNode nm) { updateSchedulerHealthInformation(releasedResources, releasedContainers); updateNodeResourceUtilization(nm); + SchedulerNode node = getSchedulerNode(nm.getNodeID()); + node.resetAllocationThisHeartbeat(); + // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { - SchedulerNode node = getNode(nm.getNodeID()); LOG.debug("Node being looked for scheduling " + nm + " availableResource: " + node.getUnallocatedResource()); } @@ -1074,8 +1077,8 @@ public Resource getNormalizedResource(Resource requestedResource) { } /** - * Normalize a list of resource requests. - * + * Normalize a list of resource requests. May need to filter out + * OPPORTUNISTIC container requests here? * @param asks resource requests */ protected void normalizeRequests(List asks) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8acf7d5..84ffecf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -91,7 +92,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, - long epoch, ResourceUsage appResourceUsage) { + long epoch, ResourceUsage appResourceUsage, RMContext context) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; @@ -102,7 +103,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, this.appResourceUsage = appResourceUsage; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - updateContext = new ContainerUpdateContext(this); + updateContext = new ContainerUpdateContext(this, context); readLock = lock.readLock(); writeLock = lock.writeLock(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 5ac2ac5..dc090c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .RMContainerImpl; @@ -64,8 +65,10 @@ new HashMap<>(); private final AppSchedulingInfo appSchedulingInfo; - ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) { + private final RMContext context; + ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo, RMContext context) { this.appSchedulingInfo = appSchedulingInfo; + this.context = context; } /** @@ -309,26 +312,45 @@ public RMContainer swapContainer(RMContainer tempRMContainer, ContainerId matchedContainerId = existingRMContainer.getContainerId(); // Swap updated container with the existing container Container tempContainer = tempRMContainer.getContainer(); + Container existingContainer = existingRMContainer.getContainer(); Resource updatedResource = createUpdatedResource( - tempContainer, existingRMContainer.getContainer(), updateType); + tempContainer, existingContainer, updateType); Resource resourceToRelease = createResourceToRelease( - existingRMContainer.getContainer(), updateType); + existingContainer, updateType); Container newContainer = Container.newInstance(matchedContainerId, - existingRMContainer.getContainer().getNodeId(), - existingRMContainer.getContainer().getNodeHttpAddress(), + existingContainer.getNodeId(), + existingContainer.getNodeHttpAddress(), updatedResource, - existingRMContainer.getContainer().getPriority(), null, + existingContainer.getPriority(), null, tempContainer.getExecutionType()); newContainer.setAllocationRequestId( - existingRMContainer.getContainer().getAllocationRequestId()); - newContainer.setVersion(existingRMContainer.getContainer().getVersion()); + existingContainer.getAllocationRequestId()); + newContainer.setVersion(existingContainer.getVersion()); - tempRMContainer.getContainer().setResource(resourceToRelease); - tempRMContainer.getContainer().setExecutionType( - existingRMContainer.getContainer().getExecutionType()); + Container updatedTempContainer = Container.newInstance( + tempContainer.getId(), tempContainer.getNodeId(), + tempContainer.getNodeHttpAddress(), + resourceToRelease, tempContainer.getPriority(), + tempContainer.getContainerToken(), + existingContainer.getExecutionType()); + + ((RMContainerImpl)tempRMContainer).setContainer(updatedTempContainer); + // notify SchedulerNode of the update to correct resource accounting + SchedulerNode oldNode = context.getScheduler().getSchedulerNode( + existingRMContainer.getAllocatedNode()); + if (oldNode != null) { + oldNode.updateContainer(tempRMContainer, tempContainer); + } ((RMContainerImpl)existingRMContainer).setContainer(newContainer); + // notify schedulerNode of the update to correct resource accounting + SchedulerNode newNode = context.getScheduler().getSchedulerNode( + tempRMContainer.getNodeId()); + if (newNode != null) { + newNode.updateContainer(existingRMContainer, existingContainer); + } + return existingRMContainer; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 397d507..05995d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -206,7 +206,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage); + abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage, rmContext); this.queue = queue; this.pendingRelease = Collections.newSetFromMap( new ConcurrentHashMap()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 272537c..56d5762 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -57,30 +60,41 @@ private static final Log LOG = LogFactory.getLog(SchedulerNode.class); + private Resource capacity; private Resource unallocatedResource = Resource.newInstance(0, 0); - private Resource allocatedResource = Resource.newInstance(0, 0); - private Resource totalResource; + private RMContainer reservedContainer; - private volatile int numContainers; private volatile ResourceUtilization containersUtilization = ResourceUtilization.newInstance(0, 0, 0f); private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); - /* set of containers that are allocated containers */ - private final Map launchedContainers = + /* set of containers that are allocated GUARANTEED containers */ + private final Map launchedGuaranteedContainers = new HashMap<>(); + private AtomicInteger numGuaranteedContainers = new AtomicInteger(0); + private Resource allocatedResourceGuaranteed = Resource.newInstance(0, 0); + + /* set of containers that are allocated OPPORTUNISTIC containers */ + private final Map + launchedOpportunisticContainers = new HashMap<>(); + private AtomicInteger numOpportunisticContainers = new AtomicInteger(0); + private Resource allocatedResourceOpportunistic = Resource.newInstance(0, 0); private final RMNode rmNode; private final String nodeName; + // The total amount of resources allocated in the current NM heartbeat + // This need to be reset each time before processing a NM node update + private Resource allocationInThisHeartbeat = Resource.newInstance(0, 0); + private volatile Set labels = null; public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; this.unallocatedResource = Resources.clone(node.getTotalCapability()); - this.totalResource = Resources.clone(node.getTotalCapability()); + this.capacity = Resources.clone(node.getTotalCapability()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -102,9 +116,9 @@ public RMNode getRMNode() { * @param resource Total resources on the node. */ public synchronized void updateTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + this.capacity = resource; + this.unallocatedResource = Resources.subtract(capacity, + this.allocatedResourceGuaranteed); } /** @@ -162,24 +176,87 @@ public void allocateContainer(RMContainer rmContainer) { */ protected synchronized void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { + if (isGuaranteed(rmContainer)) { + guaranteedContainerAllocated(rmContainer, launchedOnNode); + } else { + opportunisticContainerAllocated(rmContainer, launchedOnNode); + } + + if (LOG.isDebugEnabled()) { + Container container = rmContainer.getContainer(); + LOG.debug("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " and type " + + container.getExecutionType() + " on host " + toString()); + } + } + + private static boolean isGuaranteed(RMContainer rmContainer) { + return rmContainer.getExecutionType().equals(ExecutionType.GUARANTEED); + } + + /** + * Handles an allocation of a GUARANTEED container. + */ + private void guaranteedContainerAllocated( + RMContainer rmContainer, boolean launchedOnNode) { + assert isGuaranteed(rmContainer); + Container container = rmContainer.getContainer(); - if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { - deductUnallocatedResource(container.getResource()); - ++numContainers; + guaranteedContainerResourceAllocated(container.getResource()); + launchedGuaranteedContainers.put(container.getId(), + new ContainerInfo(rmContainer, launchedOnNode)); + numGuaranteedContainers.incrementAndGet(); + } + + /** + * Update resource bookkeeping upon an allocation of a GUARANTEED + * container. + * @param resource resource of the allocated GUARANTEED container. + */ + @VisibleForTesting + public synchronized void guaranteedContainerResourceAllocated( + Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; } + Resources.subtractFrom(unallocatedResource, resource); + Resources.addTo(allocationInThisHeartbeat, resource); + Resources.addTo(allocatedResourceGuaranteed, resource); + } - launchedContainers.put(container.getId(), + /** + * Handles an allocation of an OPPORTUNISTIC container. + */ + private void opportunisticContainerAllocated( + RMContainer rmContainer, boolean launchedOnNode) { + assert !isGuaranteed(rmContainer); + + Container container = rmContainer.getContainer(); + opportunisticContainerResourceAllocated(container.getResource()); + launchedOpportunisticContainers.put(container.getId(), new ContainerInfo(rmContainer, launchedOnNode)); + numOpportunisticContainers.incrementAndGet(); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned container " + container.getId() + " of capacity " - + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which has " + numContainers + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available after allocation"); + /** + * Updates resource bookkeeping upon an allocation of an OPPORTUNISTIC + * container. + * @param resource resource of the allocated OPPORTUNISTIC container. + */ + public synchronized void opportunisticContainerResourceAllocated( + Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; } + Resources.addTo(allocationInThisHeartbeat, resource); + Resources.addTo(allocatedResourceOpportunistic, resource); } + /** * Get unallocated resources on the node. * @return Unallocated resources on the node @@ -193,7 +270,7 @@ public synchronized Resource getUnallocatedResource() { * @return Allocated resources on the node */ public synchronized Resource getAllocatedResource() { - return this.allocatedResource; + return this.allocatedResourceGuaranteed; } /** @@ -201,30 +278,27 @@ public synchronized Resource getAllocatedResource() { * @return Total resources on the node. */ public synchronized Resource getTotalResource() { - return this.totalResource; + return this.capacity; } /** - * Check if a container is launched by this node. + * Check if a GUARANTEED container is launched by this node. * @return If the container is launched by the node. */ - public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; - } - return false; + @VisibleForTesting + public synchronized boolean isValidGuaranteedContainer( + ContainerId containerId) { + return launchedGuaranteedContainers.containsKey(containerId); } /** - * Update the resources of the node when releasing a container. - * @param container Container to release. + * Check if an OPPORTUNISTIC container is launched by this node. + * @return If the container is launched by the node. */ - protected synchronized void updateResourceForReleasedContainer( - Container container) { - if (container.getExecutionType() == ExecutionType.GUARANTEED) { - addUnallocatedResource(container.getResource()); - --numContainers; - } + @VisibleForTesting + public synchronized boolean isValidOpportunisticContainer( + ContainerId containerId) { + return launchedOpportunisticContainers.containsKey(containerId); } /** @@ -234,25 +308,16 @@ protected synchronized void updateResourceForReleasedContainer( */ public synchronized void releaseContainer(ContainerId containerId, boolean releasedByNode) { - ContainerInfo info = launchedContainers.get(containerId); - if (info == null) { - return; - } - if (!releasedByNode && info.launchedOnNode) { - // wait until node reports container has completed + RMContainer rmContainer = getContainer(containerId); + if (rmContainer == null) { + LOG.warn("Invalid container " + containerId + " is released."); return; } - launchedContainers.remove(containerId); - Container container = info.container.getContainer(); - updateResourceForReleasedContainer(container); - - if (LOG.isDebugEnabled()) { - LOG.debug("Released container " + container.getId() + " of capacity " - + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which currently has " + numContainers + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available" + ", release resources=" + true); + if (isGuaranteed(rmContainer)) { + guaranteedContainerReleased(rmContainer, releasedByNode); + } else { + opportunisticContainerReleased(rmContainer, releasedByNode); } } @@ -260,42 +325,143 @@ public synchronized void releaseContainer(ContainerId containerId, * Inform the node that a container has launched. * @param containerId ID of the launched container */ - public synchronized void containerStarted(ContainerId containerId) { - ContainerInfo info = launchedContainers.get(containerId); + public synchronized void containerLaunched(ContainerId containerId) { + ContainerInfo info = launchedGuaranteedContainers.get(containerId); + if (info == null) { + info = launchedOpportunisticContainers.get(containerId); + } + if (info != null) { info.launchedOnNode = true; } } + private void guaranteedContainerReleased(RMContainer rmContainer, + boolean releasedByNode) { + assert isGuaranteed(rmContainer); + + ContainerId containerId = rmContainer.getContainerId(); + ContainerInfo info = launchedGuaranteedContainers.get(containerId); + // ony process if the container has not been launched on a node yet + // or it is released by node. + if (releasedByNode || !info.launchedOnNode) { + Container container = rmContainer.getContainer(); + + guaranteedContainerResourceReleased(container); + launchedGuaranteedContainers.remove(containerId); + numGuaranteedContainers.decrementAndGet(); + + if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Released GUARANTEED container " + containerId + " of " + + "capacity " + container.getResource() + " on node (" + toString() + + ")" + ", release resources=" + true); + } + } + } + } + /** - * Add unallocated resources to the node. This is used when unallocating a - * container. - * @param resource Resources to add. + * Update the resources of the node when releasing a GUARANTEED container. + * @param container Container to release. */ - private synchronized void addUnallocatedResource(Resource resource) { + protected synchronized void guaranteedContainerResourceReleased( + Container container) { + assert container.getExecutionType().equals(ExecutionType.GUARANTEED); + + Resource resource = container.getResource(); if (resource == null) { LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); return; } Resources.addTo(unallocatedResource, resource); - Resources.subtractFrom(allocatedResource, resource); + Resources.subtractFrom(allocatedResourceGuaranteed, resource); + } + + + private void opportunisticContainerReleased(RMContainer rmContainer, + boolean releasedByNode) { + assert !isGuaranteed(rmContainer); + + ContainerId containerId = rmContainer.getContainerId(); + ContainerInfo info = launchedOpportunisticContainers.get(containerId); + // ony process if the container has not been launched on a node yet + // or it is released by node. + if (releasedByNode || !info.launchedOnNode) { + Container container = rmContainer.getContainer(); + + opportunisticContainerResourceReleased(container); + launchedOpportunisticContainers.remove(containerId); + numOpportunisticContainers.decrementAndGet(); + + if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Released OPPORTUNISTIC container " + containerId + + " of capacity " + container.getResource() + " on node (" + + toString() + ")" + ", release resources=" + true); + } + } + } } /** - * Deduct unallocated resources from the node. This is used when allocating a - * container. - * @param resource Resources to deduct. + * Update the resources of the node when releasing an OPPORTUNISTIC container. + * @param container Container to release. */ - @VisibleForTesting - public synchronized void deductUnallocatedResource(Resource resource) { + private void opportunisticContainerResourceReleased( + Container container) { + assert container.getExecutionType().equals(ExecutionType.OPPORTUNISTIC); + + Resource resource = container.getResource(); if (resource == null) { - LOG.error("Invalid deduction of null resource for " + LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); return; } - Resources.subtractFrom(unallocatedResource, resource); - Resources.addTo(allocatedResource, resource); + Resources.subtractFrom(allocatedResourceOpportunistic, resource); + } + + public synchronized void resetAllocationThisHeartbeat() { + this.allocationInThisHeartbeat = Resource.newInstance(0, 0); + } + + /** + * Update a container that has been previously launched on the node. + * @param rmContainer the container that has the new information + * @param oldContainer the container that has the previous information + */ + public void updateContainer(RMContainer rmContainer, Container oldContainer) { + if (launchedGuaranteedContainers.containsKey(oldContainer.getId())) { + ContainerInfo oldContainerInfo = launchedGuaranteedContainers.get( + oldContainer.getId()); + guaranteedContainerResourceReleased(oldContainer); + launchedGuaranteedContainers.remove(oldContainer.getId()); + numGuaranteedContainers.decrementAndGet(); + + if(isGuaranteed(rmContainer)) { + guaranteedContainerAllocated( + rmContainer, oldContainerInfo.launchedOnNode); + } else { + opportunisticContainerAllocated( + rmContainer, oldContainerInfo.launchedOnNode); + } + } else if ( + launchedOpportunisticContainers.containsKey(oldContainer.getId())) { + ContainerInfo oldContainerInfo = launchedOpportunisticContainers.get( + oldContainer.getId()); + opportunisticContainerResourceReleased(oldContainer); + launchedOpportunisticContainers.remove(oldContainer.getId()); + numOpportunisticContainers.decrementAndGet(); + + if(isGuaranteed(rmContainer)) { + guaranteedContainerAllocated( + rmContainer, oldContainerInfo.launchedOnNode); + } else { + opportunisticContainerAllocated( + rmContainer, oldContainerInfo.launchedOnNode); + } + } } /** @@ -315,17 +481,26 @@ public abstract void reserveResource(SchedulerApplicationAttempt attempt, @Override public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" - + getNumContainers() + " available=" + getUnallocatedResource() - + " used=" + getAllocatedResource(); + return "host: " + rmNode.getNodeAddress() + " #containers=" + + getNumGuaranteedContainers() + " #opportunistic containers=" + + getNumOpportunisticContainers()+ " available=" + + getUnallocatedResource() + " used=" + getAllocatedResource(); } /** - * Get number of active containers on the node. - * @return Number of active containers on the node. + * Get number of active GUARANTEED containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. */ - public int getNumContainers() { - return numContainers; + public int getNumGuaranteedContainers() { + return numGuaranteedContainers.get(); + } + + /** + * Get number of active OPPORTUNISTIC containers on the node. + * @return Number of active OPPORTUNISTIC containers on the node. + */ + public int getNumOpportunisticContainers() { + return numOpportunisticContainers.get(); } /** @@ -333,8 +508,12 @@ public int getNumContainers() { * @return A copy of containers running on the node. */ public synchronized List getCopiedListOfRunningContainers() { - List result = new ArrayList<>(launchedContainers.size()); - for (ContainerInfo info : launchedContainers.values()) { + List result = new ArrayList<>(launchedGuaranteedContainers.size() + + launchedOpportunisticContainers.size()); + for (ContainerInfo info : launchedGuaranteedContainers.values()) { + result.add(info.container); + } + for (ContainerInfo info : launchedOpportunisticContainers.values()) { result.add(info.container); } return result; @@ -344,9 +523,10 @@ public int getNumContainers() { * Get the containers running on the node with AM containers at the end. * @return A copy of running containers with AM containers at the end. */ - public synchronized List getRunningContainersWithAMsAtTheEnd() { + public synchronized List + getRunningGuaranteedContainersWithAMsAtTheEnd() { LinkedList result = new LinkedList<>(); - for (ContainerInfo info : launchedContainers.values()) { + for (ContainerInfo info : launchedGuaranteedContainers.values()) { if(info.container.isAMContainer()) { result.addLast(info.container); } else { @@ -362,12 +542,17 @@ public int getNumContainers() { * @return The container for the specified container ID */ protected synchronized RMContainer getContainer(ContainerId containerId) { - RMContainer container = null; - ContainerInfo info = launchedContainers.get(containerId); + ContainerInfo info = launchedGuaranteedContainers.get(containerId); + if (info != null) { + return info.container; + } + + info = launchedOpportunisticContainers.get(containerId); if (info != null) { - container = info.container; + return info.container; } - return container; + + return null; } /** @@ -445,6 +630,40 @@ public ResourceUtilization getAggregatedContainersUtilization() { return this.containersUtilization; } + public synchronized Resource allowedOverAllocation() { + OverAllocationInfo overAllocationInfo = rmNode.getOverAllocationInfo(); + if (overAllocationInfo == null) { + LOG.debug("Overallocation is disabled on node: " + rmNode.getHostName()); + return Resources.none(); + } + + ResourceUtilization projectedNodeUtilization = ResourceUtilization. + newInstance(getNodeUtilization()); + // account for resources allocated in this heartbeat + projectedNodeUtilization.addTo( + (int) (allocationInThisHeartbeat.getMemorySize()), 0, + (float) allocationInThisHeartbeat.getVirtualCores() / + capacity.getVirtualCores()); + + ResourceThresholds thresholds = + overAllocationInfo.getOverAllocationThresholds(); + Resource overAllocationThreshold = Resources.createResource( + (long) (capacity.getMemorySize() * thresholds.getMemoryThreshold()), + (int) (capacity.getVirtualCores() * thresholds.getCpuThreshold())); + long allowedMemory = Math.max(0, overAllocationThreshold.getMemorySize() + - projectedNodeUtilization.getPhysicalMemory()); + int allowedCpu = Math.max(0, (int) + (overAllocationThreshold.getVirtualCores() - + projectedNodeUtilization.getCPU() * capacity.getVirtualCores())); + + Resource resourceAllowedForOpportunisticContainers = + Resources.createResource(allowedMemory, allowedCpu); + + // TODO cap the resources allocated to OPPORTUNISTIC containers on a node + // in terms of its capacity. i.e. return min(max_radio * capacity, allowed) + return resourceAllowedForOpportunisticContainers; + } + /** * Set the resource utilization of the node. This includes the containers. * @param nodeUtilization Resource utilization of the node. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java index fa71a25..5d8c09b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java @@ -35,7 +35,7 @@ public SchedulerNodeReport(SchedulerNode node) { this.used = node.getAllocatedResource(); this.avail = node.getUnallocatedResource(); - this.num = node.getNumContainers(); + this.num = node.getNumGuaranteedContainers(); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index c26a11b..def69e4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -144,9 +144,9 @@ public synchronized void markContainerToNonKillable(ContainerId containerId) { } @Override - protected synchronized void updateResourceForReleasedContainer( + protected synchronized void guaranteedContainerResourceReleased( Container container) { - super.updateResourceForReleasedContainer(container); + super.guaranteedContainerResourceReleased(container); if (killableContainers.containsKey(container.getId())) { Resources.subtractFrom(totalKillableResources, container.getResource()); killableContainers.remove(container.getId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index b3e59c5..8327a79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -159,7 +159,7 @@ private PreemptableContainers identifyContainersToPreemptOnNode( // Figure out list of containers to consider List containersToCheck = - node.getRunningContainersWithAMsAtTheEnd(); + node.getRunningGuaranteedContainersWithAMsAtTheEnd(); containersToCheck.removeAll(node.getContainersForPreemption()); // Initialize potential with unallocated but not reserved resources diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 91170d1..f10412f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -262,6 +263,11 @@ public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + public OpportunisticContainersStatus getOpportunisticContainersStatus() { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 2c37f44..59bf6c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -213,13 +213,13 @@ public void testSchedulerRecovery() throws Exception { Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); - assertTrue(schedulerNode1.isValidContainer(runningContainer + assertTrue(schedulerNode1.isValidGuaranteedContainer(amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer(runningContainer .getContainerId())); - assertFalse(schedulerNode1.isValidContainer(completedContainer + assertFalse(schedulerNode1.isValidGuaranteedContainer(completedContainer .getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); @@ -356,13 +356,13 @@ public void testDynamicQueueRecovery() throws Exception { Resource nmResource = Resource.newInstance(nm1.getMemory(), nm1.getvCores()); - assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidGuaranteedContainer(amContainer.getContainerId())); assertTrue( - schedulerNode1.isValidContainer(runningContainer.getContainerId())); + schedulerNode1.isValidGuaranteedContainer(runningContainer.getContainerId())); assertFalse( - schedulerNode1.isValidContainer(completedContainer.getContainerId())); + schedulerNode1.isValidGuaranteedContainer(completedContainer.getContainerId())); // 2 launched containers, 1 completed container - assertEquals(2, schedulerNode1.getNumContainers()); + assertEquals(2, schedulerNode1.getNumGuaranteedContainers()); assertEquals(Resources.subtract(nmResource, usedResources), schedulerNode1.getUnallocatedResource()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index bb29889..b5e9d10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -48,7 +48,7 @@ public void testBacklistChanged() { FSLeafQueue queue = mock(FSLeafQueue.class); doReturn("test").when(queue).getQueueName(); AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo( - appAttemptId, "test", queue, null, 0, new ResourceUsage()); + appAttemptId, "test", queue, null, 0, new ResourceUsage(), null); appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList(), new ArrayList()); @@ -120,7 +120,7 @@ public void testSchedulerKeyAccounting() { doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage()); + new ResourceUsage(), null); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 0c3130d..f6cb769 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -218,8 +218,8 @@ public void testCommitProposalForFailedAppAttempt() // nm1 runs 1 container(app1-container_01/AM) // nm2 runs 1 container(app1-container_02) - Assert.assertEquals(1, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(1, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // kill app attempt1 scheduler.handle( @@ -314,8 +314,8 @@ public void testCommitOutdatedReservedProposal() throws Exception { // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, // app2-container_01/AM) // nm2 runs 1 container(app1-container_03) - Assert.assertEquals(3, sn1.getNumContainers()); - Assert.assertEquals(1, sn2.getNumContainers()); + Assert.assertEquals(3, sn1.getNumGuaranteedContainers()); + Assert.assertEquals(1, sn2.getNumGuaranteedContainers()); // reserve 1 container(app1-container_04) for app1 on nm1 ResourceRequest rr2 = ResourceRequest diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef33..c56be29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -563,7 +563,7 @@ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId, int numContainers) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); SchedulerNode node = cs.getSchedulerNode(nodeId); - Assert.assertEquals(numContainers, node.getNumContainers()); + Assert.assertEquals(numContainers, node.getNumGuaranteedContainers()); } /** @@ -1065,7 +1065,7 @@ public RMNodeLabelsManager createNodeLabelManager() { for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - if (schedulerNode1.getNumContainers() == 0) { + if (schedulerNode1.getNumGuaranteedContainers() == 0) { cycleWaited++; } } @@ -1131,7 +1131,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG + nodeIdStr + " ( Partition : [x]")); Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId()) - .getNumContainers()); + .getNumGuaranteedContainers()); rm1.close(); } @@ -1215,7 +1215,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x - Assert.assertEquals(10, schedulerNode1.getNumContainers()); + Assert.assertEquals(10, schedulerNode1.getNumGuaranteedContainers()); // check non-exclusive containers of LeafQueue is correctly updated LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); @@ -1943,7 +1943,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x - Assert.assertEquals(5, schedulerNode1.getNumContainers()); + Assert.assertEquals(5, schedulerNode1.getNumGuaranteedContainers()); SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); @@ -2043,7 +2043,7 @@ public RMNodeLabelsManager createNodeLabelManager() { } // app1 gets all resource in partition=x (non-exclusive) - Assert.assertEquals(3, schedulerNode1.getNumContainers()); + Assert.assertEquals(3, schedulerNode1.getNumGuaranteedContainers()); SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); @@ -2074,7 +2074,7 @@ public RMNodeLabelsManager createNodeLabelManager() { cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); // app1 gets all resource in default partition - Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(2, schedulerNode2.getNumGuaranteedContainers()); // 3GB is used from label x quota. 2GB used from default label. // So total 2.5 GB is remaining. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 9efa83d..aa7b105 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -324,7 +324,7 @@ public void run() { for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) { int i = ThreadLocalRandom.current().nextInt(-30, 30); synchronized (scheduler) { - node.deductUnallocatedResource(Resource.newInstance(i * 1024, i)); + node.guaranteedContainerResourceAllocated(Resource.newInstance(i * 1024, i)); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java index 0e3d344..fb3c47a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java @@ -87,7 +87,7 @@ private void saturateCluster(FSSchedulerNode schedulerNode) { while (!Resources.isNone(schedulerNode.getUnallocatedResource())) { createDefaultContainer(); schedulerNode.allocateContainer(containers.get(containers.size() - 1)); - schedulerNode.containerStarted(containers.get(containers.size() - 1). + schedulerNode.containerLaunched(containers.get(containers.size() - 1). getContainerId()); } } @@ -183,9 +183,9 @@ public void testMultipleAllocations() { assertEquals("Nothing should have been allocated, yet", Resources.none(), schedulerNode.getAllocatedResource()); schedulerNode.allocateContainer(containers.get(0)); - schedulerNode.containerStarted(containers.get(0).getContainerId()); + schedulerNode.containerLaunched(containers.get(0).getContainerId()); schedulerNode.allocateContainer(containers.get(1)); - schedulerNode.containerStarted(containers.get(1).getContainerId()); + schedulerNode.containerLaunched(containers.get(1).getContainerId()); schedulerNode.allocateContainer(containers.get(2)); assertEquals("Container should be allocated", Resources.multiply(containers.get(0).getContainer().getResource(), 3.0), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 941c215..b978bee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3288,12 +3288,12 @@ private void testAMStrictLocality(boolean node, boolean invalid) scheduler.handle(node2UpdateEvent); if (invalid) { assertEquals(0, app.getLiveContainers().size()); - assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumContainers()); - assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers()); + assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers()); } else { assertEquals(1, app.getLiveContainers().size()); - assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumContainers()); - assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers()); + assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers()); } }