diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 3b3070864e3..411c2d66fa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -350,6 +350,9 @@ public OpportunisticContainerAllocator( } } + allocatedContainers.addAll( + opportContext.pullOverAllocationContainers()); + return allocatedContainers; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 246d450668d..74faf8796f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,9 +18,16 @@ package org.apache.hadoop.yarn.server.scheduler; +import java.util.ArrayList; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +74,8 @@ > outstandingOpReqs = new TreeMap<>(); + private final List overAllocationContainers = new ArrayList<>(); + public AllocationParams getAppParams() { return appParams; } @@ -98,6 +107,21 @@ public synchronized void updateNodeList(List newNodeList) { } } + public synchronized void addToOverAllocationContainers(Container container) { + overAllocationContainers.add(container); + } + + public List pullOverAllocationContainers() { + List retList = new ArrayList<>(); + synchronized (overAllocationContainers) { + if (overAllocationContainers.size() > 0) { + retList.addAll(overAllocationContainers); + overAllocationContainers.clear(); + } + } + return retList; + } + public void updateAllocationParams(Resource minResource, Resource maxResource, Resource incrResource, int containerTokenExpiryInterval) { appParams.setMinResource(minResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 1302d1d6df3..27d1063580e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1111,6 +1111,9 @@ protected void nodeUpdate(RMNode nm) { updateSchedulerHealthInformation(releasedResources, releasedContainers); if (schedulerNode != null) { updateNodeResourceUtilization(nm); + if (LOG.isDebugEnabled()) { + LOG.debug("Updated Node resource utilization [" + schedulerNode.getNodeUtilization() + "]"); + } } // Now node data structures are up-to-date and ready for scheduling. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 38649797802..3b77acad0e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -624,19 +624,38 @@ public synchronized Resource allowedResourceForOverAllocation() { return Resources.none(); } + ResourceUtilization nodeUtilization = getNodeUtilization(); + if (nodeUtilization == null) { + return Resources.none(); + } ResourceUtilization projectedNodeUtilization = ResourceUtilization. - newInstance(getNodeUtilization()); + newInstance(nodeUtilization); + if (projectedNodeUtilization.getPhysicalMemory() == 0 && + projectedNodeUtilization.getVirtualMemory() == 0 && + projectedNodeUtilization.getCPU() == 0.0f) { + // Nothing running on the node... so don't over-allocate + return Resources.none(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Current Utilization of node [" + rmNode.getNodeID() + "]: " + + projectedNodeUtilization); + } // account for resources allocated in this heartbeat projectedNodeUtilization.addTo( (int) (resourceAllocatedPendingLaunch.getMemorySize()), 0, (float) resourceAllocatedPendingLaunch.getVirtualCores() / capacity.getVirtualCores()); + if (LOG.isDebugEnabled()) { + LOG.debug("Projected Utilization of node [" + rmNode.getNodeID() + "]: " + + projectedNodeUtilization); + } ResourceThresholds thresholds = overAllocationInfo.getOverAllocationThresholds(); Resource overAllocationThreshold = Resources.createResource( (long) (capacity.getMemorySize() * thresholds.getMemoryThreshold()), (int) (capacity.getVirtualCores() * thresholds.getCpuThreshold())); + long allowedMemory = Math.max(0, overAllocationThreshold.getMemorySize() - projectedNodeUtilization.getPhysicalMemory()); int allowedCpu = Math.max(0, (int) @@ -646,6 +665,11 @@ public synchronized Resource allowedResourceForOverAllocation() { Resource resourceAllowedForOpportunisticContainers = Resources.createResource(allowedMemory, allowedCpu); + if (LOG.isDebugEnabled()) { + LOG.debug("Over allocation threshold : " + overAllocationThreshold); + LOG.debug("Allowed Mem : " + allowedMemory); + LOG.debug("Allowed CPU : " + allowedCpu); + } // TODO cap the resources allocated to OPPORTUNISTIC containers on a node // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed) return resourceAllowedForOpportunisticContainers; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index e3050859c57..b5f30acb50f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -684,6 +685,7 @@ public boolean hasChildQueues() { boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, + Resource resForOverAlloc, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { try { readLock.lock(); @@ -703,6 +705,9 @@ boolean canAssignToThisQueue(Resource clusterResource, Resource nowTotalUsed = queueUsage.getUsed(nodePartition); + // Suppress queue usage to allow allocation + nowTotalUsed = Resources.subtract(nowTotalUsed, resForOverAlloc); + // Set headroom for currentResourceLimits: // When queue is a parent queue: Headroom = limit - used + killable // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) @@ -930,7 +935,7 @@ public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { return assignContainers(clusterResource, new SimplePlacementSet<>(node), - resourceLimits, schedulingMode); + Resources.none(), resourceLimits, schedulingMode); } @Override @@ -1046,4 +1051,12 @@ public Priority getPriority() { public Map getUserWeights() { return userWeights; } + + @Override + public CSAssignment assignContainers(Resource clusterResource, + PlacementSet ps, + ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + return assignContainers(clusterResource, ps, Resources.none(), + resourceLimits, schedulingMode); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 2cae9a95080..bd34d3cb75e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -47,6 +47,8 @@ private FiCaSchedulerApp application; private SkippedType skipped; + private boolean isOverAllocatedAssignment; + /** * Reason for the queue to get skipped. */ @@ -92,10 +94,18 @@ public CSAssignment(Resource resource, NodeType type, this.assignmentInformation = new AssignmentInformation(); } + public boolean isOverAllocatedAssignment() { + return isOverAllocatedAssignment; + } + + public void setOverAllocatedAssignment(boolean overAllocatedAssignment) { + isOverAllocatedAssignment = overAllocatedAssignment; + } + public Resource getResource() { return resource; } - + public void setResource(Resource resource) { this.resource = resource; } @@ -103,11 +113,11 @@ public void setResource(Resource resource) { public NodeType getType() { return type; } - + public void setType(NodeType type) { this.type = type; } - + public FiCaSchedulerApp getApplication() { return application; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 95d8d9b1182..f893438070e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; /** - * CSQueue represents a node in the tree of + * CSQueue represents a node in the tree of * hierarchical queues in the {@link CapacityScheduler}. */ @Stable @@ -93,19 +93,19 @@ public float getCapacity(); /** - * Get capacity of the parent of the queue as a function of the + * Get capacity of the parent of the queue as a function of the * cumulative capacity in the cluster. - * @return capacity of the parent of the queue as a function of the + * @return capacity of the parent of the queue as a function of the * cumulative capacity in the cluster */ public float getAbsoluteCapacity(); - + /** - * Get the configured maximum-capacity of the queue. + * Get the configured maximum-capacity of the queue. * @return the configured maximum-capacity of the queue */ public float getMaximumCapacity(); - + /** * Get maximum-capacity of the queue as a funciton of the cumulative capacity * of the cluster. @@ -113,7 +113,7 @@ * of the cluster */ public float getAbsoluteMaximumCapacity(); - + /** * Get the current absolute used capacity of the queue * relative to the entire cluster. @@ -131,32 +131,32 @@ /** * Get the currently utilized resources which allocated at nodes without any * labels in the cluster by the queue and children (if any). - * + * * @return used resources by the queue and it's children */ public Resource getUsedResources(); - + /** * Get the current run-state of the queue * @return current run-state */ public QueueState getState(); - + /** * Get child queues * @return child queues */ public List getChildQueues(); - + /** * Check if the user has permission to perform the operation * @param acl ACL * @param user user - * @return true if the user has the permission, + * @return true if the user has the permission, * false otherwise */ public boolean hasAccess(QueueACL acl, UserGroupInformation user); - + /** * Submit a new application to the queue. * @param applicationId the applicationId of the application being submitted @@ -189,31 +189,36 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. * @param ps {@link PlacementSet} of nodes which resources are available - * @param resourceLimits how much overall resource of this queue can use. - * @param schedulingMode Type of exclusive check when assign container on a + * @param resWithOverAlloc {@link PlacementSet} OverAllocatable Resources. + * @param resourceLimits how much overall resource of this queue can use. + * @param schedulingMode Type of exclusive check when assign container on a * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits resourceLimits, - SchedulingMode schedulingMode); - + PlacementSet ps, Resource resWithOverAlloc, + ResourceLimits resourceLimits, SchedulingMode schedulingMode); + + CSAssignment assignContainers(Resource clusterResource, + PlacementSet ps, + ResourceLimits resourceLimits, SchedulingMode schedulingMode); + /** * A container assigned to the queue has completed. * @param clusterResource the resource of the cluster * @param application application to which the container was assigned * @param node node on which the container completed - * @param container completed container, + * @param container completed container, * null if it was just a reservation - * @param containerStatus ContainerStatus for the completed + * @param containerStatus ContainerStatus for the completed * container - * @param childQueue CSQueue to reinsert in childQueues + * @param childQueue CSQueue to reinsert in childQueues * @param event event to be sent to the container * @param sortQueues indicates whether it should re-sort the queues */ public void completedContainer(Resource clusterResource, - FiCaSchedulerApp application, FiCaSchedulerNode node, - RMContainer container, ContainerStatus containerStatus, + FiCaSchedulerApp application, FiCaSchedulerNode node, + RMContainer container, ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, boolean sortQueues); @@ -223,13 +228,13 @@ public void completedContainer(Resource clusterResource, */ public int getNumApplications(); - + /** * Reinitialize the queue. * @param newlyParsedQueue new queue to re-initalize from * @param clusterResource resources in the cluster */ - public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException; /** @@ -239,13 +244,13 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) */ public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits); - + /** * Get the {@link AbstractUsersManager} for the queue. * @return the AbstractUsersManager for the queue */ public AbstractUsersManager getAbstractUsersManager(); - + /** * Adds all applications in the queue and its subqueues to the given collection. * @param apps the collection to add the applications to @@ -295,7 +300,7 @@ public void attachContainer(Resource clusterResource, * @return queueCapacities */ public QueueCapacities getQueueCapacities(); - + /** * Get ResourceUsage of this queue * @return resourceUsage diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 936f4f74e0a..257a18df08d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1272,6 +1272,7 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet 0 && + calculator.computeAvailableContainers( + availableResWithOverAlloc, minimumAllocation) > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("This node has Resources for over-allocation " + + "[Total: " + availableResWithOverAlloc + "], " + + "[Minimum: " + minimumAllocation + "], " + + "[OverAlloc: " + node.allowedResourceForOverAllocation() + "].."); + } + tryOverCommit = true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("This node or this node partition doesn't have available or" + + "killable resource"); + } + return null; } - return null; } if (LOG.isDebugEnabled()) { @@ -1338,13 +1357,34 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet 0; + if ((CSAssignment.NULL_ASSIGNMENT.equals(firstTry) || + CSAssignment.SKIP_ASSIGNMENT.equals(firstTry) || + firstTry == null) && oppPossible && + availableResWithOverAlloc.getMemorySize() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying overallocation on node[" + node.getNodeID() + "]: " + + "[Available With Over-Alloc: " + availableResWithOverAlloc + "], " + + "[Minimum: " + minimumAllocation + "]"); + } + CSAssignment overAllocAssignement = allocateOrReserveNewContainers( + ps, availableResWithOverAlloc, withNodeHeartbeat); + return overAllocAssignement; + } else { + return firstTry; + } } private CSAssignment allocateOrReserveNewContainers( - PlacementSet ps, boolean withNodeHeartbeat) { + PlacementSet ps, Resource resWithOverAlloc, + boolean withNodeHeartbeat) { CSAssignment assignment = getRootQueue().assignContainers( - getClusterResource(), ps, new ResourceLimits(labelManager + getClusterResource(), ps, resWithOverAlloc, new ResourceLimits(labelManager .getResourceByLabel(ps.getPartition(), getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1379,6 +1419,7 @@ private CSAssignment allocateOrReserveNewContainers( // Try to use NON_EXCLUSIVE assignment = getRootQueue().assignContainers(getClusterResource(), ps, + resWithOverAlloc, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager @@ -1409,7 +1450,7 @@ private CSAssignment allocateContainersOnMultiNodes( return null; } - return allocateOrReserveNewContainers(ps, false); + return allocateOrReserveNewContainers(ps, Resources.none(), false); } @VisibleForTesting @@ -1462,7 +1503,7 @@ public void handle(SchedulerEvent event) { { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; @@ -2387,6 +2428,10 @@ public ResourceUsage getClusterResourceUsage() { @VisibleForTesting public void submitResourceCommitRequest(Resource cluster, CSAssignment csAssignment) { + if (LOG.isDebugEnabled()) { + LOG.debug("Is OverAllocated Assignment ? [" + + csAssignment.isOverAllocatedAssignment() + "]"); + } ResourceCommitRequest request = createResourceCommitRequest(csAssignment); @@ -2430,6 +2475,7 @@ public void submitResourceCommitRequest(Resource cluster, csAssignment.getSchedulingMode() : SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, csAssignment.getResource()); + allocated.setOverAllocation(csAssignment.isOverAllocatedAssignment()); } // Reserved something @@ -2447,6 +2493,7 @@ public void submitResourceCommitRequest(Resource cluster, csAssignment.getSchedulingMode() : SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, csAssignment.getResource()); + allocated.setOverAllocation(csAssignment.isOverAllocatedAssignment()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 41c6f090849..cab0c0d088c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -90,9 +90,6 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @Private @Unstable @@ -522,7 +519,7 @@ public User getUser(String userName) { @Override public void reinitialize( - CSQueue newlyParsedQueue, Resource clusterResource) + CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { try { writeLock.lock(); @@ -1019,7 +1016,8 @@ private CSAssignment allocateFromReservedContainer( ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node.getNodeID(), SystemClock.getInstance().getTime(), application); CSAssignment assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, reservedContainer); + ps, Resources.none(), currentResourceLimits, + schedulingMode, reservedContainer); return assignment; } } @@ -1029,8 +1027,8 @@ private CSAssignment allocateFromReservedContainer( @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { + PlacementSet ps, Resource resForOverAlloc, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); @@ -1087,7 +1085,7 @@ public CSAssignment assignContainers(Resource clusterResource, Resource appReserved = application.getCurrentReservation(); if (needAssignToQueueCheck) { if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, appReserved, schedulingMode)) { + currentResourceLimits, resForOverAlloc, appReserved, schedulingMode)) { ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( activitiesManager, node, application, application.getPriority(), ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); @@ -1121,7 +1119,8 @@ public CSAssignment assignContainers(Resource clusterResource, userAssignable = false; } else { userAssignable = canAssignToUser(clusterResource, application.getUser(), - userLimit, application, node.getPartition(), currentResourceLimits); + resForOverAlloc, userLimit, application, node.getPartition(), + currentResourceLimits); if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { cul.canAssign = false; cul.reservation = appReserved; @@ -1138,7 +1137,7 @@ public CSAssignment assignContainers(Resource clusterResource, // Try to schedule assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, null); + ps, resForOverAlloc, currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " + application @@ -1195,6 +1194,11 @@ public boolean accept(Resource cluster, if (allocation.getAllocateFromReservedContainer() == null) { try { readLock.lock(); + // TODO: accept if over allocation for the time being + // Don't modify queue capacities + if (allocation.isOverAllocation()) { + return true; + } FiCaSchedulerApp app = schedulerContainer.getSchedulerApplicationAttempt(); String username = app.getUser(); @@ -1355,7 +1359,7 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, private Resource getHeadroom(User user, Resource currentPartitionResourceLimit, Resource clusterResource, Resource userLimitResource, String partition) { - /** + /* * Headroom is: * min( * min(userLimit, queueMaxCap) - userConsumed, @@ -1514,6 +1518,15 @@ public Resource getResourceLimitForAllUsers(String userName, protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, String nodePartition, ResourceLimits currentResourceLimits) { + return canAssignToUser(clusterResource, userName, Resources.none(), + limit, application, nodePartition, currentResourceLimits); + } + + @Private + protected boolean canAssignToUser(Resource clusterResource, + String userName, Resource resForOverAlloc, + Resource limit, FiCaSchedulerApp application, + String nodePartition, ResourceLimits currentResourceLimits) { try { readLock.lock(); User user = getUser(userName); @@ -1528,6 +1541,11 @@ protected boolean canAssignToUser(Resource clusterResource, // Note: We aren't considering the current request since there is a fixed // overhead of the AM, but it's a > check, not a >= check, so... + + // Add overAlloc resource to user limit + if (!Resources.none().equals(resForOverAlloc)) { + limit = Resources.add(limit, resForOverAlloc); + } if (Resources.greaterThan(resourceCalculator, clusterResource, user.getUsed(nodePartition), limit)) { // if enabled, check to see if could we potentially use this node instead @@ -1555,8 +1573,9 @@ protected boolean canAssignToUser(Resource clusterResource, } if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit - " + " consumed: " + user - .getUsed(nodePartition) + " limit: " + limit); + + " will exceed limit - " + + " consumed: " + user.getUsed(nodePartition) + + " limit: " + limit); } return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6800b74f8d4..96c0f589f27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -445,7 +445,7 @@ private void addApplication(ApplicationId applicationId, writeLock.unlock(); } } - + @Override public void finishApplication(ApplicationId application, String user) { @@ -479,7 +479,8 @@ private String getParentName() { @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits resourceLimits, + PlacementSet ps, Resource resForOverAlloc, + ResourceLimits resourceLimits, SchedulingMode schedulingMode) { FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); @@ -529,7 +530,7 @@ public CSAssignment assignContainers(Resource clusterResource, CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - while (canAssign(clusterResource, node)) { + while (canAssign(clusterResource, node, resForOverAlloc)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " + getQueueName()); @@ -539,7 +540,7 @@ public CSAssignment assignContainers(Resource clusterResource, // This will also consider parent's limits and also continuous reservation // looking if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), - resourceLimits, Resources + resourceLimits, resForOverAlloc, Resources .createResource(getMetrics().getReservedMB(), getMetrics().getReservedVirtualCores()), schedulingMode)) { @@ -556,7 +557,9 @@ public CSAssignment assignContainers(Resource clusterResource, // Schedule CSAssignment assignedToChild = assignContainersToChildQueues( - clusterResource, ps, resourceLimits, schedulingMode); + clusterResource, ps, resForOverAlloc, resourceLimits, schedulingMode); + assignment.setOverAllocatedAssignment( + assignedToChild.isOverAllocatedAssignment()); assignment.setType(assignedToChild.getType()); assignment.setRequestLocalityType( assignedToChild.getRequestLocalityType()); @@ -653,7 +656,8 @@ public CSAssignment assignContainers(Resource clusterResource, return assignment; } - private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { + private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node, + Resource resForOverAlloc) { // When node == null means global scheduling is enabled, always return true if (null == node) { return true; @@ -663,9 +667,12 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { // 1) Node doesn't have reserved container // 2) Node's available-resource + killable-resource should > 0 return node.getReservedContainer() == null && Resources.greaterThanOrEqual( - resourceCalculator, clusterResource, Resources - .add(node.getUnallocatedResource(), - node.getTotalKillableResources()), minimumAllocation); + resourceCalculator, clusterResource, + Resources.addTo( + Resources.add( + node.getUnallocatedResource(), + node.getTotalKillableResources()), + resForOverAlloc), minimumAllocation); } private ResourceLimits getResourceLimitsOfChild(CSQueue child, @@ -710,7 +717,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, } private CSAssignment assignContainersToChildQueues(Resource cluster, - PlacementSet ps, ResourceLimits limits, + PlacementSet ps, Resource resForOverAlloc, ResourceLimits limits, SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; @@ -732,7 +739,7 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, ps.getPartition()); CSAssignment childAssignment = childQueue.assignContainers(cluster, ps, - childLimits, schedulingMode); + resForOverAlloc, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index d3ab01e0390..cbcbbc81163 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -71,6 +71,10 @@ public AbstractContainerAllocator(FiCaSchedulerApp application, protected CSAssignment getCSAssignmentFromAllocateResult( Resource clusterResource, ContainerAllocation result, RMContainer rmContainer, FiCaSchedulerNode node) { + if (LOG.isDebugEnabled()) { + LOG.debug("ContainerAllocation isOverAllocated [" + + result.isOverAllocation() + "]"); + } // Handle skipped CSAssignment.SkippedType skipped = (result.getAllocationState() == AllocationState.APP_SKIPPED) ? @@ -122,6 +126,12 @@ protected CSAssignment getCSAssignmentFromAllocateResult( ActivityDiagnosticConstant.EMPTY); } } else if (result.getAllocationState() == AllocationState.ALLOCATED){ + + assignment.setOverAllocatedAssignment(result.isOverAllocation()); + if (result.isOverAllocation()) { + clusterResource = Resources.add( + clusterResource, result.getResourceToBeAllocated()); + } // This is a new container // Inform the ordering policy LOG.info("assignedContainer" + " application attempt=" + application @@ -161,6 +171,10 @@ protected CSAssignment getCSAssignmentFromAllocateResult( } } + if (LOG.isDebugEnabled() && assignment != null) { + LOG.debug("Created CSAssignment isOverAlloc [" + + assignment.isOverAllocatedAssignment() + "]"); + } return assignment; } @@ -178,12 +192,14 @@ protected CSAssignment getCSAssignmentFromAllocateResult( * * @param clusterResource clusterResource * @param ps PlacementSet + * @param resForOverAlloc Resource for OverAllocation. * @param schedulingMode scheduling mode (exclusive or nonexclusive) * @param resourceLimits resourceLimits * @param reservedContainer reservedContainer * @return CSAssignemnt proposal */ public abstract CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, + PlacementSet ps, Resource resForOverAlloc, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index f4085085004..919860649d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -59,6 +59,7 @@ AllocationState state; NodeType containerNodeType = NodeType.NODE_LOCAL; NodeType requestLocalityType = null; + private boolean isOverAllocation = false; /** * When some (new) container allocated/reserved or some increase container @@ -85,6 +86,14 @@ public Resource getResourceToBeAllocated() { return resourceToBeAllocated; } + public boolean isOverAllocation() { + return isOverAllocation; + } + + public void setOverAllocation(boolean overAllocation) { + isOverAllocation = overAllocation; + } + public AllocationState getAllocationState() { return state; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 4879fae0562..cd376465c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -50,10 +50,11 @@ public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, + PlacementSet ps, Resource resForOverAlloc, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { return regularContainerAllocator.assignContainers(clusterResource, - ps, schedulingMode, resourceLimits, reservedContainer); + ps, resForOverAlloc, schedulingMode, resourceLimits, reservedContainer); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 51007b59329..4045251cf26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -230,6 +231,7 @@ private ContainerAllocation checkIfNodeBlackListed(FiCaSchedulerNode node, ContainerAllocation tryAllocateOnNode(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, + Resource resForOverAlloc, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { ContainerAllocation result; @@ -247,9 +249,8 @@ ContainerAllocation tryAllocateOnNode(Resource clusterResource, // Try to allocate containers on node result = - assignContainersOnNode(clusterResource, node, schedulerKey, - reservedContainer, schedulingMode, resourceLimits); - + assignContainersOnNode(clusterResource, node, resForOverAlloc, + schedulerKey, reservedContainer, schedulingMode, resourceLimits); if (null == reservedContainer) { if (result.state == AllocationState.PRIORITY_SKIPPED) { // Don't count 'skipped nodes' as a scheduling opportunity! @@ -348,11 +349,11 @@ private ContainerAllocation assignNodeLocalContainers( Resource clusterResource, PendingAsk nodeLocalAsk, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { + ResourceLimits currentResoureLimits, Resource resForOverAlloc) { if (canAssign(schedulerKey, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, schedulerKey, nodeLocalAsk, NodeType.NODE_LOCAL, reservedContainer, - schedulingMode, currentResoureLimits); + schedulingMode, currentResoureLimits, resForOverAlloc); } // Skip node-local request, go to rack-local request @@ -366,11 +367,11 @@ private ContainerAllocation assignRackLocalContainers( Resource clusterResource, PendingAsk rackLocalAsk, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { + ResourceLimits currentResoureLimits, Resource resForOverAlloc) { if (canAssign(schedulerKey, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, schedulerKey, rackLocalAsk, NodeType.RACK_LOCAL, reservedContainer, - schedulingMode, currentResoureLimits); + schedulingMode, currentResoureLimits, resForOverAlloc); } // Skip rack-local request, go to off-switch request @@ -384,11 +385,11 @@ private ContainerAllocation assignOffSwitchContainers( Resource clusterResource, PendingAsk offSwitchAsk, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { + ResourceLimits currentResoureLimits, Resource resForOverAlloc) { if (canAssign(schedulerKey, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, schedulerKey, offSwitchAsk, NodeType.OFF_SWITCH, reservedContainer, - schedulingMode, currentResoureLimits); + schedulingMode, currentResoureLimits, resForOverAlloc); } application.updateAppSkipNodeDiagnostics( @@ -400,7 +401,7 @@ private ContainerAllocation assignOffSwitchContainers( } private ContainerAllocation assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, + FiCaSchedulerNode node, Resource resForOverAlloc, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { Priority priority = schedulerKey.getPriority(); @@ -416,7 +417,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, allocation = assignNodeLocalContainers(clusterResource, nodeLocalAsk, node, schedulerKey, reservedContainer, schedulingMode, - currentResoureLimits); + currentResoureLimits, resForOverAlloc); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { allocation.requestLocalityType = requestLocalityType; @@ -442,7 +443,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, allocation = assignRackLocalContainers(clusterResource, rackLocalAsk, node, schedulerKey, reservedContainer, schedulingMode, - currentResoureLimits); + currentResoureLimits, resForOverAlloc); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { allocation.requestLocalityType = requestLocalityType; @@ -468,7 +469,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, allocation = assignOffSwitchContainers(clusterResource, offSwitchAsk, node, schedulerKey, reservedContainer, schedulingMode, - currentResoureLimits); + currentResoureLimits, resForOverAlloc); // When a returned allocation is LOCALITY_SKIPPED, since we're in // off-switch request now, we will skip this app w.r.t priorities @@ -488,7 +489,8 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, private ContainerAllocation assignContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, PendingAsk pendingAsk, NodeType type, RMContainer rmContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits, + Resource resForOverAlloc) { Priority priority = schedulerKey.getPriority(); if (LOG.isDebugEnabled()) { @@ -502,6 +504,15 @@ private ContainerAllocation assignContainer(Resource clusterResource, Resource available = node.getUnallocatedResource(); Resource totalResource = node.getCapacity(); + boolean isOverAllocation = !resForOverAlloc.equals(Resources.none()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Node[" + node.getNodeID() + " ] resources:" + + " cluster Resource [" + clusterResource + "]" + + " requested capability [" + capability + "]" + + " over-allocatable resources [" + resForOverAlloc + "]" + + " node resources [" + totalResource + "]"); + } if (!Resources.lessThanOrEqual(rc, clusterResource, capability, totalResource)) { LOG.warn("Node : " + node.getNodeID() @@ -559,12 +570,12 @@ private ContainerAllocation assignContainer(Resource clusterResource, } } - if (availableContainers > 0) { + if (availableContainers > 0 || isOverAllocation) { // Allocate... // We will only do continuous reservation when this is not allocated from // reserved container if (rmContainer == null && reservationsContinueLooking - && node.getLabels().isEmpty()) { + && node.getLabels().isEmpty() && !isOverAllocation) { // when reservationsContinueLooking is set, we may need to unreserve // some containers to meet this queue, its parents', or the users' // resource limits. @@ -599,8 +610,12 @@ private ContainerAllocation assignContainer(Resource clusterResource, ContainerAllocation result = new ContainerAllocation(unreservedContainer, pendingAsk.getPerAllocationResource(), AllocationState.ALLOCATED); + result.setOverAllocation(isOverAllocation); result.containerNodeType = type; result.setToKillContainers(toKillContainers); + if (LOG.isDebugEnabled()) { + LOG.debug("isOverAllocation ? [" + result.isOverAllocation() + "]"); + } return result; } else { // if we are allowed to allocate but this node doesn't have space, reserve @@ -667,14 +682,15 @@ boolean shouldAllocOrReserveNewContainer( return (((starvation + requiredContainers) - reservedContainers) > 0); } - private Container getContainer(RMContainer rmContainer, - FiCaSchedulerNode node, Resource capability, + private Container getContainer(ContainerAllocation allocationResult, + RMContainer rmContainer, FiCaSchedulerNode node, Resource capability, SchedulerRequestKey schedulerKey) { return (rmContainer != null) ? rmContainer.getContainer() - : createContainer(node, capability, schedulerKey); + : createContainer(allocationResult, node, capability, schedulerKey); } - private Container createContainer(FiCaSchedulerNode node, Resource capability, + private Container createContainer(ContainerAllocation allocationResult, + FiCaSchedulerNode node, Resource capability, SchedulerRequestKey schedulerKey) { NodeId nodeId = node.getRMNode().getNodeID(); @@ -686,6 +702,8 @@ private Container createContainer(FiCaSchedulerNode node, Resource capability, return BuilderUtils.newContainer(null, nodeId, node.getRMNode().getHttpAddress(), capability, schedulerKey.getPriority(), null, + allocationResult.isOverAllocation() ? + ExecutionType.OPPORTUNISTIC : ExecutionType.GUARANTEED, schedulerKey.getAllocationRequestId()); } @@ -718,7 +736,7 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, RMContainer reservedContainer) { // Create the container if necessary Container container = - getContainer(reservedContainer, node, + getContainer(allocationResult, reservedContainer, node, allocationResult.getResourceToBeAllocated(), schedulerKey); // something went wrong getting/creating the container @@ -784,17 +802,20 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, } private ContainerAllocation allocate(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, + PlacementSet ps, Resource resForOverAlloc, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { // Do checks before determining which node to allocate // Directly return if this check fails. ContainerAllocation result; if (reservedContainer == null) { - result = preCheckForPlacementSet(clusterResource, ps, schedulingMode, - resourceLimits, schedulerKey); - if (null != result) { - return result; + if (resForOverAlloc.equals(Resources.none())) { + result = preCheckForPlacementSet(clusterResource, ps, schedulingMode, + resourceLimits, schedulerKey); + if (null != result) { + return result; + } } } else { // pre-check when allocating reserved container @@ -817,7 +838,7 @@ private ContainerAllocation allocate(Resource clusterResource, FiCaSchedulerNode node = iter.next(); result = tryAllocateOnNode(clusterResource, node, schedulingMode, - resourceLimits, schedulerKey, reservedContainer); + resForOverAlloc, resourceLimits, schedulerKey, reservedContainer); if (AllocationState.ALLOCATED == result.state || AllocationState.RESERVED == result.state) { @@ -831,9 +852,9 @@ private ContainerAllocation allocate(Resource clusterResource, @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, - RMContainer reservedContainer) { + PlacementSet ps, Resource resForOverAlloc, + SchedulingMode schedulingMode, + ResourceLimits resourceLimits, RMContainer reservedContainer) { FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); if (reservedContainer == null) { @@ -854,9 +875,14 @@ public CSAssignment assignContainers(Resource clusterResource, // Schedule in priority order for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, + allocate(clusterResource, ps, resForOverAlloc, + schedulingMode, resourceLimits, schedulerKey, null); - + if (result != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Is OverAllocation ? [" + result.isOverAllocation() + "]"); + } + } AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { continue; @@ -872,9 +898,11 @@ public CSAssignment assignContainers(Resource clusterResource, ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); return CSAssignment.SKIP_ASSIGNMENT; } else { + // TODO: Ignore reservation for over-allocation.. Fix.. ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, - reservedContainer.getReservedSchedulerKey(), reservedContainer); + allocate(clusterResource, ps, Resources.none(), + schedulingMode, resourceLimits, reservedContainer.getReservedSchedulerKey(), + reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer, node); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java index 2921e7f21d8..dcd057e8a24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java @@ -51,6 +51,8 @@ private Resource allocatedResource; // newly allocated resource + private boolean isOverAllocation = false; + public ContainerAllocationProposal( SchedulerContainer allocatedOrReservedContainer, List> toRelease, @@ -69,6 +71,14 @@ public ContainerAllocationProposal( this.allocatedResource = allocatedResource; } + public boolean isOverAllocation() { + return isOverAllocation; + } + + public void setOverAllocation(boolean overAllocation) { + isOverAllocation = overAllocation; + } + public SchedulingMode getSchedulingMode() { return schedulingMode; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a8cbe5d2736..8a4edfe5e3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -341,6 +343,17 @@ private boolean commonCheckContainerAllocation( // Do we have enough space on this node? Resource availableResource = Resources.clone( schedulerContainer.getSchedulerNode().getUnallocatedResource()); + if (allocation.isOverAllocation()) { + availableResource = Resources.addTo(availableResource, + schedulerContainer.getSchedulerNode().allowedResourceForOverAllocation()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Is Over Allocation ? " + allocation.isOverAllocation()); + LOG.debug("Allowed resources for overAlloc : " + + schedulerContainer.getSchedulerNode().allowedResourceForOverAllocation()); + LOG.debug("Available resource : " + availableResource); + LOG.debug("Requested resource : " + allocation.getAllocatedOrReservedResource()); + } // If we have any to-release container in non-reserved state, they are // from lazy-preemption, add their consumption to available resource @@ -533,42 +546,58 @@ public boolean apply(Resource cluster, schedulerContainer.getSchedulerNode(), reservedContainer); } - // Allocate a new container - addToNewlyAllocatedContainers( - schedulerContainer.getSchedulerNode(), rmContainer); - liveContainers.put(containerId, rmContainer); + if (allocation.isOverAllocation()) { + // Let the OpportunisticContext take care of the rest.. + Container container = + schedulerContainer.getRmContainer().getContainer(); + container.setContainerToken(rmContext.getContainerTokenSecretManager() + .createContainerToken(container.getId(), container.getVersion(), + container.getNodeId(), getUser(), container.getResource(), + container.getPriority(), rmContainer.getCreationTime(), + null, rmContainer.getNodeLabelExpression(), + ContainerType.TASK, ExecutionType.OPPORTUNISTIC)); + getOpportunisticContainerContext(). + addToOverAllocationContainers(container); + LOG.info("Allocating OPPORTUNISTIC container [" + containerId + "] " + + "for over-allocation"); + } else { + // Allocate a new container + addToNewlyAllocatedContainers( + schedulerContainer.getSchedulerNode(), rmContainer); + liveContainers.put(containerId, rmContainer); - // Deduct pending resource requests - List requests = appSchedulingInfo.allocate( - allocation.getAllocationLocalityType(), - schedulerContainer.getSchedulerNode(), - schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getRmContainer().getContainer()); - ((RMContainerImpl) rmContainer).setResourceRequests(requests); + // Deduct pending resource requests + List requests = appSchedulingInfo.allocate( + allocation.getAllocationLocalityType(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getRmContainer().getContainer()); + ((RMContainerImpl) rmContainer).setResourceRequests(requests); - attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), - allocation.getAllocatedOrReservedResource()); + attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), + allocation.getAllocatedOrReservedResource()); - rmContainer.handle( - new RMContainerEvent(containerId, RMContainerEventType.START)); + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.START)); - // Inform the node - schedulerContainer.getSchedulerNode().allocateContainer( - rmContainer); + // Inform the node + schedulerContainer.getSchedulerNode().allocateContainer( + rmContainer); - // update locality statistics, - incNumAllocatedContainers(allocation.getAllocationLocalityType(), - allocation.getRequestLocalityType()); + // update locality statistics, + incNumAllocatedContainers(allocation.getAllocationLocalityType(), + allocation.getRequestLocalityType()); - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationAttemptId=" + containerId - .getApplicationAttemptId() + " container=" + containerId - + " host=" + rmContainer.getAllocatedNode().getHost() - + " type=" + allocation.getAllocationLocalityType()); + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + containerId + .getApplicationAttemptId() + " container=" + containerId + + " host=" + rmContainer.getAllocatedNode().getHost() + + " type=" + allocation.getAllocationLocalityType()); + } + RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, + "SchedulerApp", getApplicationId(), containerId, + allocation.getAllocatedOrReservedResource()); } - RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, - "SchedulerApp", getApplicationId(), containerId, - allocation.getAllocatedOrReservedResource()); } else { // If the rmContainer's state is already updated to RESERVED, this is // a reReservation @@ -864,7 +893,8 @@ public LeafQueue getCSLeafQueue() { } public CSAssignment assignContainers(Resource clusterResource, - PlacementSet ps, ResourceLimits currentResourceLimits, + PlacementSet ps, Resource resForOverAlloc, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " @@ -873,7 +903,8 @@ public CSAssignment assignContainers(Resource clusterResource, } return containerAllocator.assignContainers(clusterResource, ps, - schedulingMode, currentResourceLimits, reservedContainer); + resForOverAlloc, schedulingMode, currentResourceLimits, + reservedContainer); } public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index db92d7c98dc..d6d08e8e9ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -95,7 +97,7 @@ public NodeId getNodeId() { public int getHttpPort() { return httpPort; } - + public void setHttpPort(int port) { httpPort = port; } @@ -105,7 +107,7 @@ public void setResourceTrackerService(ResourceTrackerService resourceTracker) { } public void containerStatus(ContainerStatus containerStatus) throws Exception { - Map> conts = + Map> conts = new HashMap>(); conts.put(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(), Arrays.asList(new ContainerStatus[] { containerStatus })); @@ -118,7 +120,7 @@ public void containerIncreaseStatus(Container container) throws Exception { container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, responseId); + true, responseId, null, null); } public void addRegisteringCollector(ApplicationId appId, @@ -131,17 +133,29 @@ public void addRegisteringCollector(ApplicationId appId, } public RegisterNodeManagerResponse registerNode() throws Exception { - return registerNode(null, null); + return registerNode(null, null, null); + } + + public RegisterNodeManagerResponse registerNodeWithOverAlloc( + OverAllocationInfo overAllocationInfo) throws Exception { + return registerNode(null, null, overAllocationInfo); } - + public RegisterNodeManagerResponse registerNode( List runningApplications) throws Exception { - return registerNode(null, runningApplications); + return registerNode(null, runningApplications, null); } public RegisterNodeManagerResponse registerNode( List containerReports, List runningApplications) throws Exception { + return registerNode(containerReports, runningApplications, null); + } + + public RegisterNodeManagerResponse registerNode( + List containerReports, + List runningApplications, + OverAllocationInfo overAllocationInfo) throws Exception { RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); @@ -151,6 +165,7 @@ public RegisterNodeManagerResponse registerNode( req.setContainerStatuses(containerReports); req.setNMVersion(version); req.setRunningApplications(runningApplications); + req.setOverAllocationInfo(overAllocationInfo); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = @@ -178,7 +193,15 @@ public RegisterNodeManagerResponse registerNode( public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.emptyList(), - Collections.emptyList(), isHealthy, responseId); + Collections.emptyList(), isHealthy, responseId, null, null); + } + + public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy, + ResourceUtilization nodeResUtilization, + ResourceUtilization containerResUtil) throws Exception { + return nodeHeartbeat(Collections.emptyList(), + Collections.emptyList(), isHealthy, responseId, + nodeResUtilization, containerResUtil); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -191,7 +214,7 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, containerStatusList.add(containerStatus); Log.info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.emptyList(), true, responseId); + Collections.emptyList(), true, responseId, null, null); } public NodeHeartbeatResponse nodeHeartbeat(MapemptyList(), - isHealthy, resId); + isHealthy, resId, null, null); } public NodeHeartbeatResponse nodeHeartbeat( List updatedStats, boolean isHealthy) throws Exception { return nodeHeartbeat(updatedStats, Collections.emptyList(), - isHealthy, responseId); + isHealthy, responseId, null, null); } public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, - List increasedConts, boolean isHealthy, int resId) + List increasedConts, boolean isHealthy, int resId, + ResourceUtilization nodeResUtil, ResourceUtilization containerResUtil) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); status.setNodeId(nodeId); + if (nodeResUtil != null) { + status.setNodeUtilization(nodeResUtil); + } + if (containerResUtil != null) { + status.setContainersUtilization(containerResUtil); + } ArrayList completedContainers = new ArrayList(); for (ContainerStatus stat : updatedStats) { if (stat.getState() == ContainerState.COMPLETE) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 7f4a9f21a6e..4c72e77bc6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.event.Dispatcher; @@ -70,6 +71,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -158,6 +161,141 @@ public void stopRM() { } } + @Test(timeout = 600000) + public void testContainerOverAllocation() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 1024, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 5120, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNodeWithOverAlloc( + OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.95f))); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true, ResourceUtilization.newInstance(0, 0, 0.0f), + ResourceUtilization.newInstance(0, 0, 0.0f)); + + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + + // All nodes 1 - 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true, ResourceUtilization.newInstance(0, 0, 0.0f), + ResourceUtilization.newInstance(0, 0, 0.0f)); + + // Ask for 3 containers... + // Without over-allocation, only 1 can run at a time... + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(3 * GB), 3, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))), + null); + + nm2.nodeHeartbeat(true, ResourceUtilization.newInstance(0, 0, 0.0f), + ResourceUtilization.newInstance(0, 0, 0.0f)); + // Wait for scheduler to process all events + dispatcher.waitForEventThreadToWait(); + + Thread.sleep(1000); + AllocateResponse allocateResponse = am1.allocate(null, null); + + // Only 1 of the containers will be allocated as guaranteed.. + // since no space left for other containers... + Assert.assertEquals(1, allocateResponse.getAllocatedContainers().size()); + Container ac1 = allocateResponse.getAllocatedContainers().get(0); + Assert.assertEquals(ExecutionType.GUARANTEED, ac1.getExecutionType()); + + // Start Container in NM + nm2.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(ac1.getId(), + ExecutionType.GUARANTEED, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Send NM heartbeat stating that the 3GB container is now using only 1 GB... + nm2.nodeHeartbeat(true, ResourceUtilization.newInstance(1 * GB, 1 * GB, 0.2f), + ResourceUtilization.newInstance(1 * GB, 1 * GB, 0.2f)); + + // Wait for scheduler to process all events + dispatcher.waitForEventThreadToWait(); + + Thread.sleep(200); + allocateResponse = am1.allocate(null, null); + + // One of the containers will be allocated as opportunistic + // Using over-allocated resources + Assert.assertEquals(1, allocateResponse.getAllocatedContainers().size()); + Container ac2 = allocateResponse.getAllocatedContainers().get(0); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, ac2.getExecutionType()); + + // ###### Phase 2 + + System.out.println("\n\n ####### Phase 2 - BEFORE START ###### \n\n"); + + // Next container.... + nm2.nodeHeartbeat(true, ResourceUtilization.newInstance(1 * GB, 0 * GB, 0.2f), + ResourceUtilization.newInstance(1 * GB, 0 * GB, 0.2f)); + // Wait for scheduler to process all events + dispatcher.waitForEventThreadToWait(); + + Thread.sleep(200); + allocateResponse = am1.allocate(null, null); + + // NO containers should be allocated now... + // Since second container hasn't started yet... + Assert.assertEquals(0, allocateResponse.getAllocatedContainers().size()); + + System.out.println("\n\n ####### Phase 2 - AFTER START ###### \n\n"); + + // Start Second Container in NM + nm2.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(ac1.getId(), + ExecutionType.GUARANTEED, ContainerState.RUNNING, "", 0), + ContainerStatus.newInstance(ac2.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Node HB - stating that Node utilization in now 4GB.. + // Should NOT be able to launch containers... + nm2.nodeHeartbeat(true, ResourceUtilization.newInstance(4 * GB, 0 * GB, 0.2f), + ResourceUtilization.newInstance(4 * GB, 0 * GB, 0.2f)); + // Wait for scheduler to process all events + dispatcher.waitForEventThreadToWait(); + + Thread.sleep(200); + allocateResponse = am1.allocate(null, null); + Assert.assertEquals(0, allocateResponse.getAllocatedContainers().size()); + + // Node HB - stating that Node utilization had gone down to 3GB.. + // Now we Should be able to launch containers... + nm2.nodeHeartbeat(true, ResourceUtilization.newInstance(3 * GB, 0 * GB, 0.2f), + ResourceUtilization.newInstance(3 * GB, 0 * GB, 0.2f)); + // Wait for scheduler to process all events + dispatcher.waitForEventThreadToWait(); + + Thread.sleep(200); + allocateResponse = am1.allocate(null, null); + + Container ac3 = allocateResponse.getAllocatedContainers().get(0); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, ac3.getExecutionType()); + + } @Test(timeout = 600000) public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception { HashMap nodes = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index c43069bac08..fbc72376696 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -312,7 +312,7 @@ public void testNMTokensRebindOnAMRestart() throws Exception { ContainerId containerId3 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); - + // fail am1 nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); @@ -658,7 +658,7 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() status.setContainerId(attempt2.getMasterContainer().getId()); status.setContainerState(ContainerState.COMPLETE); status.setDiagnostics(""); - nm1.registerNode(Collections.singletonList(status), null); + nm1.registerNode(Collections.singletonList(status), null, null); rm2.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.FAILED); Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, @@ -704,7 +704,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { // we will verify the app should be failed if // two continuous attempts failed in 60s. RMApp app = rm1.submitApp(200, 60000, false); - + MockAM am = MockRM.launchAM(app, rm1, nm1); // Fail current attempt normally nm1.nodeHeartbeat(am.getApplicationAttemptId(), @@ -778,7 +778,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { status.setContainerId(attempt3.getMasterContainer().getId()); status.setContainerState(ContainerState.COMPLETE); status.setDiagnostics(""); - nm1.registerNode(Collections.singletonList(status), null); + nm1.registerNode(Collections.singletonList(status), null, null); rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); //Wait to make sure attempt3 be removed in State Store diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 5e6548bc80e..37ddc969ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -228,7 +228,7 @@ public void testReservation() throws Exception { any(String.class)); rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); - a.submitApplicationAttempt(app_0, user_0); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); @@ -237,7 +237,7 @@ public void testReservation() throws Exception { app_1 = spy(app_1); Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class), any(String.class)); - a.submitApplicationAttempt(app_1, user_0); + a.submitApplicationAttempt(app_1, user_0); // Setup some nodes String host_0 = "host_0"; @@ -565,7 +565,7 @@ public void testReservationNoContinueLook() throws Exception { any(String.class)); rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); - a.submitApplicationAttempt(app_0, user_0); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); @@ -574,7 +574,7 @@ public void testReservationNoContinueLook() throws Exception { app_1 = spy(app_1); Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class), any(String.class)); - a.submitApplicationAttempt(app_1, user_0); + a.submitApplicationAttempt(app_1, user_0); // Setup some nodes String host_0 = "host_0"; @@ -748,7 +748,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception { any(String.class)); rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); - a.submitApplicationAttempt(app_0, user_0); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); @@ -757,7 +757,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception { app_1 = spy(app_1); Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class), any(String.class)); - a.submitApplicationAttempt(app_1, user_0); + a.submitApplicationAttempt(app_1, user_0); // Setup some nodes String host_0 = "host_0"; @@ -896,7 +896,7 @@ public void testGetAppToUnreserve() throws Exception { String host_1 = "host_1"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8 * GB); - + Resource clusterResource = Resources.createResource(2 * 8 * GB); // Setup resource-requests @@ -1037,7 +1037,7 @@ public void testAssignToQueue() throws Exception { any(String.class)); rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); - a.submitApplicationAttempt(app_0, user_0); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); @@ -1046,7 +1046,7 @@ public void testAssignToQueue() throws Exception { app_1 = spy(app_1); Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class), any(String.class)); - a.submitApplicationAttempt(app_1, user_0); + a.submitApplicationAttempt(app_1, user_0); // Setup some nodes String host_0 = "host_0"; @@ -1149,7 +1149,7 @@ public void testAssignToQueue() throws Exception { new ResourceLimits(Resources.createResource(13 * GB)); boolean res = a.canAssignToThisQueue(Resources.createResource(13 * GB), - RMNodeLabelsManager.NO_LABEL, limits, + RMNodeLabelsManager.NO_LABEL, limits, Resources.none(), Resources.createResource(3 * GB), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertTrue(res); @@ -1165,7 +1165,7 @@ public void testAssignToQueue() throws Exception { new ResourceLimits(Resources.createResource(13 * GB)); res = a.canAssignToThisQueue(Resources.createResource(13 * GB), - RMNodeLabelsManager.NO_LABEL, limits, + RMNodeLabelsManager.NO_LABEL, limits, Resources.none(), Resources.createResource(3 * GB), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); @@ -1225,7 +1225,7 @@ public void testAssignToUser() throws Exception { Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class), any(String.class)); rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); - a.submitApplicationAttempt(app_0, user_0); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); @@ -1234,7 +1234,7 @@ public void testAssignToUser() throws Exception { app_1 = spy(app_1); Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class), any(String.class)); - a.submitApplicationAttempt(app_1, user_0); + a.submitApplicationAttempt(app_1, user_0); // Setup some nodes String host_0 = "host_0"; @@ -1387,7 +1387,7 @@ public void testReservationsNoneAvailable() throws Exception { any(String.class)); rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); - a.submitApplicationAttempt(app_0, user_0); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); @@ -1396,7 +1396,7 @@ public void testReservationsNoneAvailable() throws Exception { app_1 = spy(app_1); Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class), any(String.class)); - a.submitApplicationAttempt(app_1, user_0); + a.submitApplicationAttempt(app_1, user_0); // Setup some nodes String host_0 = "host_0"; @@ -1488,7 +1488,7 @@ public void testReservationsNoneAvailable() throws Exception { // try to assign reducer (5G on node 0), but tell it's resource limits < // used (8G) + required (5G). It will not reserved since it has to unreserve - // some resource. Even with continous reservation looking, we don't allow + // some resource. Even with continous reservation looking, we don't allow // unreserve resource to reserve container. TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, @@ -1499,7 +1499,7 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(0 * GB, a.getMetrics().getReservedMB()); assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); assertEquals(16 * GB, a.getMetrics().getAvailableMB()); - // app_0's headroom = limit (10G) - used (8G) = 2G + // app_0's headroom = limit (10G) - used (8G) = 2G assertEquals(2 * GB, app_0.getHeadroom().getMemorySize()); assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); @@ -1517,7 +1517,7 @@ public void testReservationsNoneAvailable() throws Exception { assertEquals(0 * GB, a.getMetrics().getReservedMB()); assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); assertEquals(16 * GB, a.getMetrics().getAvailableMB()); - // app_0's headroom = limit (10G) - used (8G) = 2G + // app_0's headroom = limit (10G) - used (8G) = 2G assertEquals(2 * GB, app_0.getHeadroom().getMemorySize()); assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());