diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 03fc40e..32f2b81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -105,7 +105,7 @@ public RMActiveServiceContext() { } - + @Private @Unstable public RMActiveServiceContext(Dispatcher rmDispatcher, @@ -130,7 +130,6 @@ public RMActiveServiceContext(Dispatcher rmDispatcher, this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); this.setScheduler(scheduler); - RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1d0d6c0..75abc98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -95,8 +95,7 @@ public RMContextImpl(Dispatcher rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, containerTokenSecretManager, nmTokenSecretManager, - clientToAMTokenSecretManager, rmApplicationHistoryWriter, - scheduler)); + clientToAMTokenSecretManager, rmApplicationHistoryWriter, scheduler)); ConfigurationProvider provider = new LocalConfigurationProvider(); setConfigurationProvider(provider); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index fd8a7ee..035cedd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -42,16 +42,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -61,7 +57,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -760,26 +755,11 @@ private synchronized FiCaSchedulerApp getApplication( return applicationAttemptMap.get(applicationAttemptId); } - private static final CSAssignment NULL_ASSIGNMENT = + public static final CSAssignment NULL_ASSIGNMENT = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); - - private static Set getRequestLabelSetByExpression( - String labelExpression) { - Set labels = new HashSet(); - if (null == labelExpression) { - return labels; - } - for (String l : labelExpression.split("&&")) { - if (l.trim().isEmpty()) { - continue; - } - labels.add(l.trim()); - } - return labels; - } - + public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { @@ -798,151 +778,33 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - FiCaSchedulerApp application = + FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); - synchronized (application) { - return assignReservedContainer(application, node, reservedContainer, - clusterResource); - } + return application.assignReservedContainer(application, node, reservedContainer, + clusterResource); } // Try to assign containers to applications in order for (FiCaSchedulerApp application : activeApplications) { - - if(LOG.isDebugEnabled()) { - LOG.debug("pre-assignContainers for application " - + application.getApplicationId()); - application.showRequests(); - } + CSAssignment assignment = + application.assignContainers(node, clusterResource, needToUnreserve); - synchronized (application) { - // Check if this resource is on the blacklist - if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { - continue; - } + // Update allocated resource in this Queue + Resource assigned = assignment.getResource(); + if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, + Resources.none())) { + // Book-keeping + // Note: Update headroom to account for current allocation too... + allocateResource(clusterResource, application, assigned, + node.getLabels()); - // Schedule in priority order - for (Priority priority : application.getPriorities()) { - ResourceRequest anyRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - if (null == anyRequest) { - continue; - } - - // Required resource - Resource required = anyRequest.getCapability(); - - // Do we need containers at this 'priority'? - if (application.getTotalRequiredResources(priority) <= 0) { - continue; - } - if (!this.reservationsContinueLooking) { - if (!needContainers(application, priority, required)) { - if (LOG.isDebugEnabled()) { - LOG.debug("doesn't need containers based on reservation algo!"); - } - continue; - } - } - - Set requestedNodeLabels = - getRequestLabelSetByExpression(anyRequest - .getNodeLabelExpression()); - - // Compute user-limit & set headroom - // Note: We compute both user-limit & headroom with the highest - // priority request as the target. - // This works since we never assign lower priority requests - // before all higher priority ones are serviced. - Resource userLimit = - computeUserLimitAndSetHeadroom(application, clusterResource, - required, requestedNodeLabels); - - // Check queue max-capacity limit - if (!canAssignToThisQueue(clusterResource, required, - node.getLabels(), application, true)) { - return NULL_ASSIGNMENT; - } - - // Check user limit - if (!assignToUser(clusterResource, application.getUser(), userLimit, - application, true, requestedNodeLabels)) { - break; - } - - // Inform the application it is about to get a scheduling opportunity - application.addSchedulingOpportunity(priority); - - // Try to schedule - CSAssignment assignment = - assignContainersOnNode(clusterResource, node, application, priority, - null, needToUnreserve); - - // Did the application skip this node? - if (assignment.getSkipped()) { - // Don't count 'skipped nodes' as a scheduling opportunity! - application.subtractSchedulingOpportunity(priority); - continue; - } - - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); - if (Resources.greaterThan( - resourceCalculator, clusterResource, assigned, Resources.none())) { - - // Book-keeping - // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned, - node.getLabels()); - - // Don't reset scheduling opportunities for non-local assignments - // otherwise the app will be delayed for each non-local assignment. - // This helps apps with many off-cluster requests schedule faster. - if (assignment.getType() != NodeType.OFF_SWITCH) { - if (LOG.isDebugEnabled()) { - LOG.debug("Resetting scheduling opportunities"); - } - application.resetSchedulingOpportunities(priority); - } - - // Done - return assignment; - } else { - // Do not assign out of order w.r.t priorities - break; - } - } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("post-assignContainers for application " - + application.getApplicationId()); + return assignment; } - application.showRequests(); } return NULL_ASSIGNMENT; } - - private synchronized CSAssignment - assignReservedContainer(FiCaSchedulerApp application, - FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) { - // Do we still need this reservation? - Priority priority = rmContainer.getReservedPriority(); - if (application.getTotalRequiredResources(priority) == 0) { - // Release - return new CSAssignment(application, rmContainer); - } - - // Try to assign if we have sufficient resources - assignContainersOnNode(clusterResource, node, application, priority, - rmContainer, false); - - // Doesn't matter... since it's already charged for at time of reservation - // "re-reservation" is *free* - return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); - } protected Resource getHeadroom(User user, Resource queueMaxCap, Resource clusterResource, FiCaSchedulerApp application, Resource required) { @@ -977,7 +839,7 @@ private Resource getHeadroom(User user, Resource queueMaxCap, return headroom; } - synchronized boolean canAssignToThisQueue(Resource clusterResource, + public synchronized boolean canAssignToThisQueue(Resource clusterResource, Resource required, Set nodeLabels, FiCaSchedulerApp application, boolean checkReservations) { // Get label of this queue can access, it's (nodeLabel AND queueLabel) @@ -1081,7 +943,7 @@ private Resource updateHeadroomInfo(Resource clusterResource, } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) - Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, + public Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource clusterResource, Resource required, Set requestedLabels) { String user = application.getUser(); User queueUser = getUser(user); @@ -1213,7 +1075,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, } @Private - protected synchronized boolean assignToUser(Resource clusterResource, + public synchronized boolean assignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, boolean checkReservations, Set requestLabels) { User user = getUser(userName); @@ -1257,464 +1119,11 @@ protected synchronized boolean assignToUser(Resource clusterResource, return true; } - boolean needContainers(FiCaSchedulerApp application, Priority priority, - Resource required) { - int requiredContainers = application.getTotalRequiredResources(priority); - int reservedContainers = application.getNumReservedContainers(priority); - int starvation = 0; - if (reservedContainers > 0) { - float nodeFactor = - Resources.ratio( - resourceCalculator, required, getMaximumAllocation() - ); - - // Use percentage of node required to bias against large containers... - // Protect against corner case where you need the whole node with - // Math.min(nodeFactor, minimumAllocationFactor) - starvation = - (int)((application.getReReservations(priority) / (float)reservedContainers) * - (1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor()))) - ); - - if (LOG.isDebugEnabled()) { - LOG.debug("needsContainers:" + - " app.#re-reserve=" + application.getReReservations(priority) + - " reserved=" + reservedContainers + - " nodeFactor=" + nodeFactor + - " minAllocFactor=" + getMinimumAllocationFactor() + - " starvation=" + starvation); - } - } - return (((starvation + requiredContainers) - reservedContainers) > 0); - } - - private CSAssignment assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve) { - Resource assigned = Resources.none(); - - // Data-local - ResourceRequest nodeLocalResourceRequest = - application.getResourceRequest(priority, node.getNodeName()); - if (nodeLocalResourceRequest != null) { - assigned = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, application, priority, reservedContainer, needToUnreserve); - if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned, Resources.none())) { - return new CSAssignment(assigned, NodeType.NODE_LOCAL); - } - } - - // Rack-local - ResourceRequest rackLocalResourceRequest = - application.getResourceRequest(priority, node.getRackName()); - if (rackLocalResourceRequest != null) { - if (!rackLocalResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - - assigned = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, application, priority, reservedContainer, needToUnreserve); - if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned, Resources.none())) { - return new CSAssignment(assigned, NodeType.RACK_LOCAL); - } - } - - // Off-switch - ResourceRequest offSwitchResourceRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - if (offSwitchResourceRequest != null) { - if (!offSwitchResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - - return new CSAssignment( - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer, needToUnreserve), - NodeType.OFF_SWITCH); - } - - return SKIP_ASSIGNMENT; - } - - @Private - protected boolean findNodeToUnreserve(Resource clusterResource, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource capability) { - // need to unreserve some other container first - NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability); - if (idToUnreserve == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("checked to see if could unreserve for app but nothing " - + "reserved that matches for this app"); - } - return false; - } - FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve); - if (nodeToUnreserve == null) { - LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve); - return false; - } - if (LOG.isDebugEnabled()) { - LOG.debug("unreserving for app: " + application.getApplicationId() - + " on nodeId: " + idToUnreserve - + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + capability); - } - - // headroom - Resources.addTo(application.getHeadroom(), nodeToUnreserve - .getReservedContainer().getReservedResource()); - - // Make sure to not have completedContainers sort the queues here since - // we are already inside an iterator loop for the queues and this would - // cause an concurrent modification exception. - completedContainer(clusterResource, application, nodeToUnreserve, - nodeToUnreserve.getReservedContainer(), - SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve - .getReservedContainer().getContainerId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, false); - return true; - } - - @Private - protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability, - boolean needToUnreserve) { - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate"); - } - return false; - } - - // we can't reserve if we got here based on the limit - // checks assuming we could unreserve!!! - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, capability, null); - - // Check queue max-capacity limit, - // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit queue limit"); - } - return false; - } - - // Check user limit - if (!assignToUser(clusterResource, application.getUser(), userLimit, - application, false, null)) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit user limit"); - } - return false; - } - return true; - } - - - private Resource assignNodeLocalContainers(Resource clusterResource, - ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve) { - if (canAssign(application, priority, node, NodeType.NODE_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - needToUnreserve); - } - - return Resources.none(); - } - - private Resource assignRackLocalContainers( - Resource clusterResource, ResourceRequest rackLocalResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve) { - if (canAssign(application, priority, node, NodeType.RACK_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - needToUnreserve); - } - - return Resources.none(); - } - - private Resource assignOffSwitchContainers( - Resource clusterResource, ResourceRequest offSwitchResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, boolean needToUnreserve) { - if (canAssign(application, priority, node, NodeType.OFF_SWITCH, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - needToUnreserve); - } - - return Resources.none(); - } - - boolean canAssign(FiCaSchedulerApp application, Priority priority, - FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { - - // Clearly we need containers for this application... - if (type == NodeType.OFF_SWITCH) { - if (reservedContainer != null) { - return true; - } - - // 'Delay' off-switch - ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, ResourceRequest.ANY); - long missedOpportunities = application.getSchedulingOpportunities(priority); - long requiredContainers = offSwitchRequest.getNumContainers(); - - float localityWaitFactor = - application.getLocalityWaitFactor(priority, - scheduler.getNumClusterNodes()); - - return ((requiredContainers * localityWaitFactor) < missedOpportunities); - } - - // Check if we need containers on this rack - ResourceRequest rackLocalRequest = - application.getResourceRequest(priority, node.getRackName()); - if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { - return false; - } - - // If we are here, we do need containers on this rack for RACK_LOCAL req - if (type == NodeType.RACK_LOCAL) { - // 'Delay' rack-local just a little bit... - long missedOpportunities = application.getSchedulingOpportunities(priority); - return ( - Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()) < - missedOpportunities - ); - } - - // Check if we need containers on this host - if (type == NodeType.NODE_LOCAL) { - // Now check if we need containers on this host... - ResourceRequest nodeLocalRequest = - application.getResourceRequest(priority, node.getNodeName()); - if (nodeLocalRequest != null) { - return nodeLocalRequest.getNumContainers() > 0; - } - } - - return false; - } - - private Container getContainer(RMContainer rmContainer, - FiCaSchedulerApp application, FiCaSchedulerNode node, - Resource capability, Priority priority) { - return (rmContainer != null) ? rmContainer.getContainer() : - createContainer(application, node, capability, priority); - } - - Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, - Resource capability, Priority priority) { - - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); - - // Create the container - Container container = - BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); - - return container; - } - - - private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, - ResourceRequest request, NodeType type, RMContainer rmContainer, - boolean needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " application=" + application.getApplicationId() - + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type - + " needToUnreserve= " + needToUnreserve); - } - - // check if the resource request can access the label - if (!SchedulerUtils.checkNodeLabelExpression( - node.getLabels(), - request.getNodeLabelExpression())) { - // this is a reserved container, but we cannot allocate it now according - // to label not match. This can be caused by node label changed - // We should un-reserve this container. - if (rmContainer != null) { - unreserve(application, priority, node, rmContainer); - } - return Resources.none(); - } - - Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); - Resource totalResource = node.getTotalResource(); - - if (!Resources.fitsIn(capability, totalResource)) { - LOG.warn("Node : " + node.getNodeID() - + " does not have sufficient resource for request : " + request - + " node total capability : " + node.getTotalResource()); - return Resources.none(); - } - assert Resources.greaterThan( - resourceCalculator, clusterResource, available, Resources.none()); - - // Create the container if necessary - Container container = - getContainer(rmContainer, application, node, capability, priority); - - // something went wrong getting/creating the container - if (container == null) { - LOG.warn("Couldn't get container for allocation!"); - return Resources.none(); - } - - // default to true since if reservation continue look feature isn't on - // needContainers is checked earlier and we wouldn't have gotten this far - boolean canAllocContainer = true; - if (this.reservationsContinueLooking) { - // based on reservations can we allocate/reserve more or do we need - // to unreserve one first - canAllocContainer = needContainers(application, priority, capability); - if (LOG.isDebugEnabled()) { - LOG.debug("can alloc container is: " + canAllocContainer); - } - } - - // Can we allocate a container on this node? - int availableContainers = - resourceCalculator.computeAvailableContainers(available, capability); - if (availableContainers > 0) { - // Allocate... - - // Did we previously reserve containers at this 'priority'? - if (rmContainer != null) { - unreserve(application, priority, node, rmContainer); - } else if (this.reservationsContinueLooking - && (!canAllocContainer || needToUnreserve)) { - // need to unreserve some other container first - boolean res = findNodeToUnreserve(clusterResource, node, application, - priority, capability); - if (!res) { - return Resources.none(); - } - } else { - // we got here by possibly ignoring queue capacity limits. If the - // parameter needToUnreserve is true it means we ignored one of those - // limits in the chance we could unreserve. If we are here we aren't - // trying to unreserve so we can't allocate anymore due to that parent - // limit. - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate, skipping"); - } - return Resources.none(); - } - } - - // Inform the application - RMContainer allocatedContainer = - application.allocate(type, node, priority, request, container); - - // Does the application need this resource? - if (allocatedContainer == null) { - return Resources.none(); - } - - // Inform the node - node.allocateContainer(allocatedContainer); - - LOG.info("assignedContainer" + - " application attempt=" + application.getApplicationAttemptId() + - " container=" + container + - " queue=" + this + - " clusterResource=" + clusterResource); - - return container.getResource(); - } else { - // if we are allowed to allocate but this node doesn't have space, reserve it or - // if this was an already a reserved container, reserve it again - if ((canAllocContainer) || (rmContainer != null)) { - - if (reservationsContinueLooking) { - // we got here by possibly ignoring parent queue capacity limits. If - // the parameter needToUnreserve is true it means we ignored one of - // those limits in the chance we could unreserve. If we are here - // we aren't trying to unreserve so we can't allocate - // anymore due to that parent limit - boolean res = checkLimitsToReserve(clusterResource, application, capability, - needToUnreserve); - if (!res) { - return Resources.none(); - } - } - - // Reserve by 'charging' in advance... - reserve(application, priority, node, rmContainer, container); - - LOG.info("Reserved container " + - " application=" + application.getApplicationId() + - " resource=" + request.getCapability() + - " queue=" + this.toString() + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + - " cluster=" + clusterResource); - - return request.getCapability(); - } - return Resources.none(); - } - } - - private void reserve(FiCaSchedulerApp application, Priority priority, - FiCaSchedulerNode node, RMContainer rmContainer, Container container) { - // Update reserved metrics if this is the first reservation - if (rmContainer == null) { - getMetrics().reserveResource( - application.getUser(), container.getResource()); - } - - // Inform the application - rmContainer = application.reserve(node, priority, rmContainer, container); - - // Update the node - node.reserveResource(application, priority, rmContainer); - } - - private boolean unreserve(FiCaSchedulerApp application, Priority priority, - FiCaSchedulerNode node, RMContainer rmContainer) { - // Done with the reservation? - if (application.unreserve(node, priority)) { - node.unreserveResource(application); - - // Update reserved metrics - getMetrics().unreserveResource(application.getUser(), - rmContainer.getContainer().getResource()); - return true; - } - return false; - } - @Override - public void completedContainer(Resource clusterResource, - FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, - boolean sortQueues) { + public void completedContainer(Resource clusterResource, + FiCaSchedulerApp application, FiCaSchedulerNode node, + RMContainer rmContainer, ContainerStatus containerStatus, + RMContainerEventType event, CSQueue childQueue, boolean sortQueues) { if (application != null) { boolean removed = false; @@ -1729,7 +1138,7 @@ public void completedContainer(Resource clusterResource, // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { - removed = unreserve(application, rmContainer.getReservedPriority(), + removed = application.unreserve(rmContainer.getReservedPriority(), node, rmContainer); } else { removed = @@ -1756,7 +1165,7 @@ public void completedContainer(Resource clusterResource, } } - synchronized void allocateResource(Resource clusterResource, + public synchronized void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, Set nodeLabels) { super.allocateResource(clusterResource, resource, nodeLabels); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 9f97b13..85886b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -48,11 +49,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -68,14 +76,18 @@ new HashSet(); private CapacityHeadroomProvider headroomProvider; + private CapacitySchedulerContext csContext; + private ResourceCalculator resourceCalculator; + private float minimumAllocationFactor; + private LeafQueue leafQueue; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + // Set AM resource of this application RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); - Resource amResource; if (rmApp == null || rmApp.getAMResourceRequest() == null) { //the rmApp may be undefined (the resource manager checks for this too) @@ -85,11 +97,21 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, } else { amResource = rmApp.getAMResourceRequest().getCapability(); } - setAMResource(amResource); + + if (rmContext.getScheduler() instanceof CapacitySchedulerContext) { + csContext = (CapacitySchedulerContext) rmContext.getScheduler(); + resourceCalculator = csContext.getResourceCalculator(); + minimumAllocationFactor = + Resources.ratio(resourceCalculator, Resources.subtract( + csContext.getMaximumResourceCapability(), + csContext.getMinimumResourceCapability()), csContext + .getMaximumResourceCapability()); + leafQueue = (LeafQueue)queue; + } } - synchronized public boolean containerCompleted(RMContainer rmContainer, + public synchronized boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { // Remove from the list of containers @@ -130,7 +152,588 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, return true; } - synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, + private boolean canAssign(Priority priority, + FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { + + // Clearly we need containers for this application... + if (type == NodeType.OFF_SWITCH) { + if (reservedContainer != null) { + return true; + } + + // 'Delay' off-switch + ResourceRequest offSwitchRequest = + getResourceRequest(priority, ResourceRequest.ANY); + long missedOpportunities = getSchedulingOpportunities(priority); + long requiredContainers = offSwitchRequest.getNumContainers(); + + float localityWaitFactor = getLocalityWaitFactor(priority, + csContext.getNumClusterNodes()); + + return ((requiredContainers * localityWaitFactor) < missedOpportunities); + } + + // Check if we need containers on this rack + ResourceRequest rackLocalRequest = + getResourceRequest(priority, node.getRackName()); + if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { + return false; + } + + // If we are here, we do need containers on this rack for RACK_LOCAL req + if (type == NodeType.RACK_LOCAL) { + // 'Delay' rack-local just a little bit... + long missedOpportunities = getSchedulingOpportunities(priority); + return (Math.min(csContext.getNumClusterNodes(), csContext + .getConfiguration().getNodeLocalityDelay()) < missedOpportunities); + } + + // Check if we need containers on this host + if (type == NodeType.NODE_LOCAL) { + // Now check if we need containers on this host... + ResourceRequest nodeLocalRequest = + getResourceRequest(priority, node.getNodeName()); + if (nodeLocalRequest != null) { + return nodeLocalRequest.getNumContainers() > 0; + } + } + + return false; + } + + private Resource assignNodeLocalContainers(Resource clusterResource, + ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, + Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { + if (canAssign(priority, node, NodeType.NODE_LOCAL, + reservedContainer)) { + return assignContainer(clusterResource, node, priority, + nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, + needToUnreserve); + } + + return Resources.none(); + } + + private Resource assignRackLocalContainers( + Resource clusterResource, ResourceRequest rackLocalResourceRequest, + FiCaSchedulerNode node, Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { + if (canAssign(priority, node, NodeType.RACK_LOCAL, + reservedContainer)) { + return assignContainer(clusterResource, node, priority, + rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, + needToUnreserve); + } + + return Resources.none(); + } + + private Resource assignOffSwitchContainers( + Resource clusterResource, ResourceRequest offSwitchResourceRequest, + FiCaSchedulerNode node, Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { + if (canAssign(priority, node, NodeType.OFF_SWITCH, + reservedContainer)) { + return assignContainer(clusterResource, node, priority, + offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, + needToUnreserve); + } + + return Resources.none(); + } + + private Container getContainer(RMContainer rmContainer, + FiCaSchedulerNode node, Resource capability, Priority priority) { + return (rmContainer != null) ? rmContainer.getContainer() + : createContainer(node, capability, priority); + } + + private Container createContainer(FiCaSchedulerNode node, Resource capability, + Priority priority) { + + NodeId nodeId = node.getRMNode().getNodeID(); + ContainerId containerId = + BuilderUtils.newContainerId(getApplicationAttemptId(), + getNewContainerId()); + + // Create the container + Container container = + BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() + .getHttpAddress(), capability, priority, null); + + return container; + } + + private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, + Priority priority, + ResourceRequest request, NodeType type, RMContainer rmContainer, + boolean needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("assignContainers: node=" + node.getNodeName() + + " application=" + getApplicationId() + + " priority=" + priority.getPriority() + + " request=" + request + " type=" + type + + " needToUnreserve= " + needToUnreserve); + } + + // check if the resource request can access the label + if (!SchedulerUtils.checkNodeLabelExpression( + node.getLabels(), + request.getNodeLabelExpression())) { + // this is a reserved container, but we cannot allocate it now according + // to label not match. This can be caused by node label changed + // We should un-reserve this container. + if (rmContainer != null) { + unreserve(priority, node, rmContainer); + } + return Resources.none(); + } + + Resource capability = request.getCapability(); + Resource available = node.getAvailableResource(); + Resource totalResource = node.getTotalResource(); + + if (!Resources.fitsIn(capability, totalResource)) { + LOG.warn("Node : " + node.getNodeID() + + " does not have sufficient resource for request : " + request + + " node total capability : " + node.getTotalResource()); + return Resources.none(); + } + assert Resources.greaterThan( + resourceCalculator, clusterResource, available, Resources.none()); + + // Create the container if necessary + Container container = + getContainer(rmContainer, node, capability, priority); + + // something went wrong getting/creating the container + if (container == null) { + LOG.warn("Couldn't get container for allocation!"); + return Resources.none(); + } + + boolean reservationsContinueLooking = + csContext.getConfiguration().getReservationContinueLook(); + + // default to true since if reservation continue look feature isn't on + // needContainers is checked earlier and we wouldn't have gotten this far + boolean canAllocContainer = true; + if (reservationsContinueLooking) { + // based on reservations can we allocate/reserve more or do we need + // to unreserve one first + canAllocContainer = needContainers(priority, capability); + if (LOG.isDebugEnabled()) { + LOG.debug("can alloc container is: " + canAllocContainer); + } + } + + // Can we allocate a container on this node? + int availableContainers = + resourceCalculator.computeAvailableContainers(available, capability); + if (availableContainers > 0) { + // Allocate... + + // Did we previously reserve containers at this 'priority'? + if (rmContainer != null) { + unreserve(priority, node, rmContainer); + } else if (reservationsContinueLooking + && (!canAllocContainer || needToUnreserve)) { + // need to unreserve some other container first + boolean res = findNodeToUnreserve(clusterResource, node, + priority, capability); + if (!res) { + return Resources.none(); + } + } else { + // we got here by possibly ignoring queue capacity limits. If the + // parameter needToUnreserve is true it means we ignored one of those + // limits in the chance we could unreserve. If we are here we aren't + // trying to unreserve so we can't allocate anymore due to that parent + // limit. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate, skipping"); + } + return Resources.none(); + } + } + + // Inform the application + RMContainer allocatedContainer = + allocate(type, node, priority, request, container); + + // Does the application need this resource? + if (allocatedContainer == null) { + return Resources.none(); + } + + // Inform the node + node.allocateContainer(allocatedContainer); + + LOG.info("assignedContainer" + + " application attempt=" + getApplicationAttemptId() + + " container=" + container + + " queue=" + this + + " clusterResource=" + clusterResource); + + return container.getResource(); + } else { + // if we are allowed to allocate but this node doesn't have space, reserve + // it or if this was an already a reserved container, reserve it again + if ((canAllocContainer) || (rmContainer != null)) { + + if (reservationsContinueLooking) { + // we got here by possibly ignoring parent queue capacity limits. If + // the parameter needToUnreserve is true it means we ignored one of + // those limits in the chance we could unreserve. If we are here + // we aren't trying to unreserve so we can't allocate + // anymore due to that parent limit + boolean res = checkLimitsToReserve(clusterResource, capability, + needToUnreserve); + if (!res) { + return Resources.none(); + } + } + + // Reserve by 'charging' in advance... + reserve(priority, node, rmContainer, container); + + return request.getCapability(); + } + return Resources.none(); + } + } + + public synchronized CSAssignment assignReservedContainer( + FiCaSchedulerApp application, FiCaSchedulerNode node, + RMContainer rmContainer, Resource clusterResource) { + // Do we still need this reservation? + Priority priority = rmContainer.getReservedPriority(); + if (application.getTotalRequiredResources(priority) == 0) { + // Release + return new CSAssignment(application, rmContainer); + } + + // Try to assign if we have sufficient resources + assignContainersOnNode(clusterResource, node, priority, rmContainer, false); + + // Doesn't matter... since it's already charged for at time of reservation + // "re-reservation" is *free* + return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); + } + + private void reserve(Priority priority, + FiCaSchedulerNode node, RMContainer rmContainer, Container container) { + // Update reserved metrics if this is the first reservation + if (rmContainer == null) { + leafQueue.getMetrics().reserveResource( + getUser(), container.getResource()); + } + + // Inform the application + rmContainer = super.reserve(node, priority, rmContainer, container); + + // Update the node + node.reserveResource(this, priority, rmContainer); + } + + public synchronized boolean unreserve(Priority priority, + FiCaSchedulerNode node, RMContainer rmContainer) { + // Done with the reservation? + if (unreserveResource(node, priority)) { + node.unreserveResource(this); + + // Update reserved metrics + leafQueue.getMetrics().unreserveResource(getUser(), + rmContainer.getContainer().getResource()); + return true; + } + return false; + } + + private boolean checkLimitsToReserve(Resource clusterResource, + Resource capability, boolean needToUnreserve) { + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } + return false; + } + + // we can't reserve if we got here based on the limit + // checks assuming we could unreserve!!! + Resource userLimit = leafQueue.computeUserLimitAndSetHeadroom(this, + clusterResource, capability, null); + + // Check queue max-capacity limit, + // TODO: Consider reservation on labels + if (!leafQueue.canAssignToThisQueue(clusterResource, capability, null, + this, false)) { + if (LOG.isDebugEnabled()) { + LOG.debug("was going to reserve but hit queue limit"); + } + return false; + } + + // Check user limit + if (!leafQueue.assignToUser(clusterResource, getUser(), userLimit, + this, false, null)) { + if (LOG.isDebugEnabled()) { + LOG.debug("was going to reserve but hit user limit"); + } + return false; + } + return true; + } + + @Private + @VisibleForTesting + public synchronized boolean findNodeToUnreserve(Resource clusterResource, + FiCaSchedulerNode node, Priority priority, Resource capability) { + // need to unreserve some other container first + NodeId idToUnreserve = getNodeIdToUnreserve(priority, capability); + if (idToUnreserve == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("checked to see if could unreserve for app but nothing " + + "reserved that matches for this app"); + } + return false; + } + FiCaSchedulerNode nodeToUnreserve = csContext.getNode(idToUnreserve); + if (nodeToUnreserve == null) { + LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve); + return false; + } + if (LOG.isDebugEnabled()) { + LOG.debug("unreserving for app: " + getApplicationId() + + " on nodeId: " + idToUnreserve + + " in order to replace reserved application and place it on node: " + + node.getNodeID() + " needing: " + capability); + } + + // headroom + Resources.addTo(getHeadroom(), nodeToUnreserve + .getReservedContainer().getReservedResource()); + + // Make sure to not have completedContainers sort the queues here since + // we are already inside an iterator loop for the queues and this would + // cause an concurrent modification exception. + leafQueue.completedContainer(clusterResource, this, nodeToUnreserve, + nodeToUnreserve.getReservedContainer(), + SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve + .getReservedContainer().getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, false); + return true; + } + + private boolean needContainers(Priority priority, Resource required) { + int requiredContainers = getTotalRequiredResources(priority); + int reservedContainers = getNumReservedContainers(priority); + int starvation = 0; + if (reservedContainers > 0) { + float nodeFactor = + Resources.ratio( + resourceCalculator, required, csContext.getMaximumResourceCapability() + ); + + // Use percentage of node required to bias against large containers... + // Protect against corner case where you need the whole node with + // Math.min(nodeFactor, minimumAllocationFactor) + starvation = + (int)((getReReservations(priority) / (float)reservedContainers) * + (1.0f - (Math.min(nodeFactor, minimumAllocationFactor))) + ); + + if (LOG.isDebugEnabled()) { + LOG.debug("needsContainers:" + + " app.#re-reserve=" + getReReservations(priority) + + " reserved=" + reservedContainers + + " nodeFactor=" + nodeFactor + + " minAllocFactor=" + minimumAllocationFactor + + " starvation=" + starvation); + } + } + return (((starvation + requiredContainers) - reservedContainers) > 0); + } + + private static Set getRequestLabelSetByExpression( + String labelExpression) { + Set labels = new HashSet(); + if (null == labelExpression) { + return labels; + } + for (String l : labelExpression.split("&&")) { + if (l.trim().isEmpty()) { + continue; + } + labels.add(l.trim()); + } + return labels; + } + + public synchronized CSAssignment assignContainers(FiCaSchedulerNode node, + Resource clusterResource, boolean needToUnreserve) { + if(LOG.isDebugEnabled()) { + LOG.debug("pre-assignContainers for application " + + getApplicationId()); + showRequests(); + } + + // Check if this resource is on the blacklist + if (SchedulerAppUtils.isBlacklisted(this, node, LOG)) { + return LeafQueue.NULL_ASSIGNMENT; + } + + // Schedule in priority order + for (Priority priority : getPriorities()) { + ResourceRequest anyRequest = + getResourceRequest(priority, ResourceRequest.ANY); + if (null == anyRequest) { + continue; + } + + // Required resource + Resource required = anyRequest.getCapability(); + + // Do we need containers at this 'priority'? + if (getTotalRequiredResources(priority) <= 0) { + continue; + } + if (!csContext.getConfiguration().getReservationContinueLook()) { + if (!needContainers(priority, required)) { + if (LOG.isDebugEnabled()) { + LOG.debug("doesn't need containers based on reservation algo!"); + } + continue; + } + } + + Set requestedNodeLabels = + getRequestLabelSetByExpression(anyRequest + .getNodeLabelExpression()); + + // Compute user-limit & set headroom + // Note: We compute both user-limit & headroom with the highest + // priority request as the target. + // This works since we never assign lower priority requests + // before all higher priority ones are serviced. + Resource userLimit = + leafQueue.computeUserLimitAndSetHeadroom(this, clusterResource, + required, requestedNodeLabels); + + // Check queue max-capacity limit + if (!leafQueue.canAssignToThisQueue(clusterResource, required, + node.getLabels(), this, true)) { + return LeafQueue.NULL_ASSIGNMENT; + } + + // Check user limit + if (!leafQueue.assignToUser(clusterResource, getUser(), userLimit, + this, true, requestedNodeLabels)) { + break; + } + + // Inform the application it is about to get a scheduling opportunity + addSchedulingOpportunity(priority); + + // Try to schedule + CSAssignment assignment = + assignContainersOnNode(clusterResource, node, priority, + null, needToUnreserve); + + // Did the application skip this node? + if (assignment.getSkipped()) { + // Don't count 'skipped nodes' as a scheduling opportunity! + subtractSchedulingOpportunity(priority); + continue; + } + + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); + if (Resources.greaterThan( + resourceCalculator, clusterResource, assigned, Resources.none())) { + // Don't reset scheduling opportunities for non-local assignments + // otherwise the app will be delayed for each non-local assignment. + // This helps apps with many off-cluster requests schedule faster. + if (assignment.getType() != NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + LOG.debug("Resetting scheduling opportunities"); + } + resetSchedulingOpportunities(priority); + } + + // Done + return assignment; + } else { + // Do not assign out of order w.r.t priorities + break; + } + } + + if(LOG.isDebugEnabled()) { + LOG.debug("post-assignContainers for application " + + getApplicationId()); + showRequests(); + } + + return LeafQueue.NULL_ASSIGNMENT; + } + + private CSAssignment assignContainersOnNode(Resource clusterResource, + FiCaSchedulerNode node, Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { + Resource assigned = Resources.none(); + + // Data-local + ResourceRequest nodeLocalResourceRequest = + getResourceRequest(priority, node.getNodeName()); + if (nodeLocalResourceRequest != null) { + assigned = + assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, + node, priority, reservedContainer, needToUnreserve); + if (Resources.greaterThan(resourceCalculator, clusterResource, + assigned, Resources.none())) { + return new CSAssignment(assigned, NodeType.NODE_LOCAL); + } + } + + // Rack-local + ResourceRequest rackLocalResourceRequest = + getResourceRequest(priority, node.getRackName()); + if (rackLocalResourceRequest != null) { + if (!rackLocalResourceRequest.getRelaxLocality()) { + return LeafQueue.SKIP_ASSIGNMENT; + } + + assigned = + assignRackLocalContainers(clusterResource, rackLocalResourceRequest, + node, priority, reservedContainer, needToUnreserve); + if (Resources.greaterThan(resourceCalculator, clusterResource, + assigned, Resources.none())) { + return new CSAssignment(assigned, NodeType.RACK_LOCAL); + } + } + + // Off-switch + ResourceRequest offSwitchResourceRequest = + getResourceRequest(priority, ResourceRequest.ANY); + if (offSwitchResourceRequest != null) { + if (!offSwitchResourceRequest.getRelaxLocality()) { + return LeafQueue.SKIP_ASSIGNMENT; + } + + return new CSAssignment( + assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, + node, priority, reservedContainer, needToUnreserve), + NodeType.OFF_SWITCH); + } + + return LeafQueue.SKIP_ASSIGNMENT; + } + + public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, Container container) { @@ -178,7 +781,8 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, return rmContainer; } - public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { + @VisibleForTesting + public boolean unreserveResource(FiCaSchedulerNode node, Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); @@ -210,7 +814,7 @@ public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) return false; } - public synchronized float getLocalityWaitFactor( + private float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = @@ -273,7 +877,8 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, allocation.getNMTokenList()); } - synchronized public NodeId getNodeIdToUnreserve(Priority priority, + @VisibleForTesting + public synchronized NodeId getNodeIdToUnreserve(Priority priority, Resource capability) { // first go around make this algorithm simple and just grab first @@ -323,6 +928,9 @@ public synchronized void transferStateFromPreviousAttempt( this.headroomProvider = ((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); } - - + + @VisibleForTesting + public synchronized void setMinimumAllocationFactor(float factor) { + this.minimumAllocationFactor = factor; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 81a5aad..0def72d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -21,12 +21,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.mockito.Matchers; import org.mockito.Mockito; @@ -545,30 +542,30 @@ public void testHeadroom() throws Exception { setupQueueConfiguration(csConf); YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getMinimumResourceCapability()). + CapacityScheduler cs = mock(CapacityScheduler.class); + when(cs.getConfiguration()).thenReturn(csConf); + when(cs.getConf()).thenReturn(conf); + when(cs.getMinimumResourceCapability()). thenReturn(Resources.createResource(GB)); - when(csContext.getMaximumResourceCapability()). + when(cs.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB)); - when(csContext.getApplicationComparator()). + when(cs.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). + when(cs.getQueueComparator()). thenReturn(CapacityScheduler.queueComparator); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getRMContext()).thenReturn(rmContext); + when(cs.getResourceCalculator()).thenReturn(resourceCalculator); + when(cs.getRMContext()).thenReturn(rmContext); // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); - when(csContext.getClusterResource()).thenReturn(clusterResource); + when(cs.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); - CapacityScheduler.parseQueue(csContext, csConf, null, "root", + CapacityScheduler.parseQueue(cs, csConf, null, "root", queues, queues, TestUtils.spyHook); // Manipulate queue 'a' - LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue queue = (LeafQueue)queues.get(A); String host_0 = "host_0"; String rack_0 = "rack_0"; @@ -579,19 +576,7 @@ public void testHeadroom() throws Exception { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - RMContext rmContext = TestUtils.getMockRMContext(); - RMContext spyRMContext = spy(rmContext); - - ConcurrentMap spyApps = - spy(new ConcurrentHashMap()); - RMApp rmApp = mock(RMApp.class); - ResourceRequest amResourceRequest = mock(ResourceRequest.class); - Resource amResource = Resources.createResource(0, 0); - when(amResourceRequest.getCapability()).thenReturn(amResource); - when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); - Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); - when(spyRMContext.getRMApps()).thenReturn(spyApps); - + RMContext rmContext = TestUtils.getMockRMContext(cs); Priority priority_1 = TestUtils.createMockPriority(1); @@ -601,7 +586,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp( appAttemptId_0_0, user_0, queue, - queue.getActiveUsersManager(), spyRMContext); + queue.getActiveUsersManager(), rmContext); queue.submitApplicationAttempt(app_0_0, user_0); List app_0_0_requests = new ArrayList(); @@ -620,7 +605,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp( appAttemptId_0_1, user_0, queue, - queue.getActiveUsersManager(), spyRMContext); + queue.getActiveUsersManager(), rmContext); queue.submitApplicationAttempt(app_0_1, user_0); List app_0_1_requests = new ArrayList(); @@ -639,7 +624,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp( appAttemptId_1_0, user_1, queue, - queue.getActiveUsersManager(), spyRMContext); + queue.getActiveUsersManager(), rmContext); queue.submitApplicationAttempt(app_1_0, user_1); List app_1_0_requests = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index ead5719..5bed417 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -22,10 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -52,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -89,13 +85,9 @@ import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class TestLeafQueue { - private static final Log LOG = LogFactory.getLog(TestLeafQueue.class); - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -117,8 +109,9 @@ @Before public void setUp() throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); + spyCs.setConf(new CapacitySchedulerConfiguration()); cs = spy(spyCs); - rmContext = TestUtils.getMockRMContext(); + rmContext = TestUtils.getMockRMContext(cs); spyRMContext = spy(rmContext); ConcurrentMap spyApps = @@ -140,27 +133,26 @@ public void setUp() throws Exception { YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); - csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 32)); - when(csContext.getClusterResource()). - thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); - when(csContext.getResourceCalculator()). - thenReturn(resourceCalculator); - when(csContext.getRMContext()).thenReturn(rmContext); + csContext = cs; + doReturn(csConf).when(cs).getConfiguration(); + doReturn(conf).when(cs).getConf(); + doReturn(Resources.createResource(GB, 1)).when(cs) + .getMinimumResourceCapability(); + doReturn(Resources.createResource(16 * GB, 32)).when(cs) + .getMaximumResourceCapability(); + doReturn(Resources.createResource(100 * 16 * GB, 100 * 32)).when(cs) + .getClusterResource(); + doReturn(CapacityScheduler.applicationComparator).when(cs) + .getApplicationComparator(); + doReturn(CapacityScheduler.queueComparator).when(cs).getQueueComparator(); + doReturn(resourceCalculator).when(cs).getResourceCalculator(); + doReturn(rmContext).when(cs).getRMContext(); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); containerTokenSecretManager.rollMasterKey(); - when(csContext.getContainerTokenSecretManager()).thenReturn( - containerTokenSecretManager); + doReturn(containerTokenSecretManager).when(cs) + .getContainerTokenSecretManager(); + cs.setRMContext(spyRMContext); root = CapacityScheduler.parseQueue(csContext, csConf, null, @@ -168,7 +160,6 @@ public void setUp() throws Exception { queues, queues, TestUtils.spyHook); - cs.setRMContext(spyRMContext); cs.init(csConf); cs.start(); } @@ -226,88 +217,47 @@ private void setupQueueConfiguration( conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); } - - static LeafQueue stubLeafQueue(LeafQueue queue) { - - // Mock some methods for ease in these unit tests - - // 1. LeafQueue.createContainer to return dummy containers - doAnswer( - new Answer() { - @Override - public Container answer(InvocationOnMock invocation) - throws Throwable { - final FiCaSchedulerApp application = - (FiCaSchedulerApp)(invocation.getArguments()[0]); - final ContainerId containerId = - TestUtils.getMockContainerId(application); - - Container container = TestUtils.getMockContainer( - containerId, - ((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(), - (Resource)(invocation.getArguments()[2]), - ((Priority)invocation.getArguments()[3])); - return container; - } - } - ). - when(queue).createContainer( - any(FiCaSchedulerApp.class), - any(FiCaSchedulerNode.class), - any(Resource.class), - any(Priority.class) - ); - - // 2. Stub out LeafQueue.parent.completedContainer - CSQueue parent = queue.getParent(); - doNothing().when(parent).completedContainer( - any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), - any(RMContainer.class), any(ContainerStatus.class), - any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); - - return queue; - } @Test public void testInitializeQueue() throws Exception { - final float epsilon = 1e-5f; - //can add more sturdy test with 3-layer queues - //once MAPREDUCE:3410 is resolved - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); - assertEquals(0.085, a.getCapacity(), epsilon); - assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); - assertEquals(0.2, a.getMaximumCapacity(), epsilon); - assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); - - LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); - assertEquals(0.80, b.getCapacity(), epsilon); - assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); - assertEquals(0.99, b.getMaximumCapacity(), epsilon); - assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); - - ParentQueue c = (ParentQueue)queues.get(C); - assertEquals(0.015, c.getCapacity(), epsilon); - assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); - assertEquals(0.1, c.getMaximumCapacity(), epsilon); - assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); - - //Verify the value for getAMResourceLimit for queues with < .1 maxcap - Resource clusterResource = Resource.newInstance(50 * GB, 50); + final float epsilon = 1e-5f; + //can add more sturdy test with 3-layer queues + //once MAPREDUCE:3410 is resolved + LeafQueue a = (LeafQueue)queues.get(A); + assertEquals(0.085, a.getCapacity(), epsilon); + assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); + assertEquals(0.2, a.getMaximumCapacity(), epsilon); + assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); - a.updateClusterResource(clusterResource); - assertEquals(Resource.newInstance(1 * GB, 1), - a.getAMResourceLimit()); + LeafQueue b = (LeafQueue)queues.get(B); + assertEquals(0.80, b.getCapacity(), epsilon); + assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); + assertEquals(0.99, b.getMaximumCapacity(), epsilon); + assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); + + ParentQueue c = (ParentQueue)queues.get(C); + assertEquals(0.015, c.getCapacity(), epsilon); + assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); + assertEquals(0.1, c.getMaximumCapacity(), epsilon); + assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + + //Verify the value for getAMResourceLimit for queues with < .1 maxcap + Resource clusterResource = Resource.newInstance(50 * GB, 50); + + a.updateClusterResource(clusterResource); + assertEquals(Resource.newInstance(1 * GB, 1), + a.getAMResourceLimit()); - b.updateClusterResource(clusterResource); - assertEquals(Resource.newInstance(5 * GB, 1), - b.getAMResourceLimit()); + b.updateClusterResource(clusterResource); + assertEquals(Resource.newInstance(5 * GB, 1), + b.getAMResourceLimit()); } @Test public void testSingleQueueOneUserMetrics() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B)); + LeafQueue a = (LeafQueue)queues.get(B); // Users final String user_0 = "user_0"; @@ -357,7 +307,7 @@ public void testSingleQueueOneUserMetrics() throws Exception { public void testUserQueueAcl() throws Exception { // Manipulate queue 'a' - LeafQueue d = stubLeafQueue((LeafQueue) queues.get(D)); + LeafQueue d = (LeafQueue) queues.get(D); // Users final String user_d = "user_d"; @@ -382,7 +332,7 @@ public void testUserQueueAcl() throws Exception { public void testAppAttemptMetrics() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B)); + LeafQueue a = (LeafQueue) queues.get(B); // Users final String user_0 = "user_0"; @@ -436,7 +386,7 @@ public void testAppAttemptMetrics() throws Exception { public void testSingleQueueWithOneUser() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); @@ -569,7 +519,7 @@ public void testSingleQueueWithOneUser() throws Exception { @Test public void testUserLimits() throws Exception { // Mock the queue - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); @@ -662,7 +612,7 @@ public void testUserLimits() throws Exception { @Test public void testComputeUserLimitAndSetHeadroom(){ - LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B)); + LeafQueue qb = (LeafQueue)queues.get(B); qb.setMaxCapacity(1.0f); // Users final String user_0 = "user_0"; @@ -809,7 +759,7 @@ public void testComputeUserLimitAndSetHeadroom(){ @Test public void testUserHeadroomMultiApp() throws Exception { // Mock the queue - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); @@ -843,9 +793,6 @@ public void testUserHeadroomMultiApp() throws Exception { String host_0 = "127.0.0.1"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 16*GB); - String host_1 = "127.0.0.2"; - FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, - 0, 16*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1); @@ -897,7 +844,7 @@ public void testUserHeadroomMultiApp() throws Exception { @Test public void testHeadroomWithMaxCap() throws Exception { // Mock the queue - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); @@ -1008,7 +955,7 @@ public void testHeadroomWithMaxCap() throws Exception { public void testSingleQueueWithMultipleUsers() throws Exception { // Mock the queue - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); @@ -1194,7 +1141,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception { public void testReservation() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); @@ -1302,7 +1249,7 @@ public void testReservation() throws Exception { @Test public void testStolenReservedContainer() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); @@ -1377,6 +1324,8 @@ public void testStolenReservedContainer() throws Exception { // node_1 heartbeats in and gets the DEFAULT_RACK request for app_1 // We do not need locality delay here doReturn(-1).when(a).getNodeLocalityDelay(); + csContext.getConfiguration().setInt( + CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, -1); a.assignContainers(clusterResource, node_1, false); assertEquals(10*GB, a.getUsedResources().getMemory()); @@ -1408,10 +1357,19 @@ public void testStolenReservedContainer() throws Exception { public void testReservationExchange() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); //unset maxCapacity a.setMaxCapacity(1.0f); a.setUserLimitFactor(10); + + final int numNodes = 3; + Resource clusterResource = + Resources.createResource(numNodes * (4*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(4*GB, 16)); + when(a.getMaximumAllocation()).thenReturn( + Resources.createResource(4*GB, 16)); // Users final String user_0 = "user_0"; @@ -1431,6 +1389,11 @@ public void testReservationExchange() throws Exception { new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_1); + + // Manipulate app_0/app_1's minimum-allocation-factor so that we can reach + // re-reservation faster + app_0.setMinimumAllocationFactor(0.25f); + app_1.setMinimumAllocationFactor(0.25f); // Setup some nodes String host_0 = "127.0.0.1"; @@ -1439,16 +1402,6 @@ public void testReservationExchange() throws Exception { String host_1 = "127.0.0.2"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); - final int numNodes = 3; - Resource clusterResource = - Resources.createResource(numNodes * (4*GB), numNodes * 16); - when(csContext.getNumClusterNodes()).thenReturn(numNodes); - when(csContext.getMaximumResourceCapability()).thenReturn( - Resources.createResource(4*GB, 16)); - when(a.getMaximumAllocation()).thenReturn( - Resources.createResource(4*GB, 16)); - when(a.getMinimumAllocationFactor()).thenReturn(0.25f); // 1G / 4G - // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -1540,7 +1493,7 @@ public void testReservationExchange() throws Exception { public void testLocalityScheduling() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); // User String user_0 = "user_0"; @@ -1661,7 +1614,8 @@ public void testLocalityScheduling() throws Exception { FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); // Rack-delay - doReturn(1).when(a).getNodeLocalityDelay(); + csContext.getConfiguration().setInt( + CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 1); // Shouldn't assign RACK_LOCAL yet assignment = a.assignContainers(clusterResource, node_3, false); @@ -1681,7 +1635,7 @@ public void testLocalityScheduling() throws Exception { @Test public void testApplicationPriorityScheduling() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); // User String user_0 = "user_0"; @@ -1812,7 +1766,7 @@ public void testApplicationPriorityScheduling() throws Exception { public void testSchedulingConstraints() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); // User String user_0 = "user_0"; @@ -1915,7 +1869,7 @@ public void testSchedulingConstraints() throws Exception { public void testActivateApplicationAfterQueueRefresh() throws Exception { // Manipulate queue 'e' - LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + LeafQueue e = (LeafQueue)queues.get(E); // Users final String user_e = "user_e"; @@ -1971,7 +1925,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { public void testNodeLocalityAfterQueueRefresh() throws Exception { // Manipulate queue 'e' - LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + LeafQueue e = (LeafQueue)queues.get(E); // before reinitialization assertEquals(40, e.getNodeLocalityDelay()); @@ -1996,7 +1950,7 @@ public void testActivateApplicationByUpdatingClusterResource() throws Exception { // Manipulate queue 'e' - LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + LeafQueue e = (LeafQueue)queues.get(E); // Users final String user_e = "user_e"; @@ -2050,10 +2004,10 @@ public boolean hasQueueACL(List aclInfos, QueueACL acl) { public void testInheritedQueueAcls() throws IOException { UserGroupInformation user = UserGroupInformation.getCurrentUser(); - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); - LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); + LeafQueue a = (LeafQueue)queues.get(A); + LeafQueue b = (LeafQueue)queues.get(B); ParentQueue c = (ParentQueue)queues.get(C); - LeafQueue c1 = stubLeafQueue((LeafQueue)queues.get(C1)); + LeafQueue c1 = (LeafQueue)queues.get(C1); assertFalse(root.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user)); assertTrue(a.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user)); @@ -2076,7 +2030,7 @@ public void testInheritedQueueAcls() throws IOException { public void testLocalityConstraints() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + LeafQueue a = (LeafQueue)queues.get(A); // User String user_0 = "user_0"; @@ -2332,7 +2286,7 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() throws Exception { // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B)); + LeafQueue a = (LeafQueue) queues.get(B); // Users final String user_0 = "user_0"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 985609e..5a5d6bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -23,8 +23,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -66,8 +66,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class TestReservations { @@ -95,7 +93,7 @@ public void setUp() throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); cs = spy(spyCs); - rmContext = TestUtils.getMockRMContext(); + rmContext = TestUtils.getMockRMContext(cs); } @@ -109,26 +107,27 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception { YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); - csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getMinimumResourceCapability()).thenReturn( - Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()).thenReturn( - Resources.createResource(16 * GB, 12)); - when(csContext.getClusterResource()).thenReturn( - Resources.createResource(100 * 16 * GB, 100 * 12)); - when(csContext.getApplicationComparator()).thenReturn( - CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()).thenReturn( - CapacityScheduler.queueComparator); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getRMContext()).thenReturn(rmContext); - RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( - conf); + csContext = cs; + doReturn(csConf).when(csContext).getConfiguration(); + doReturn(conf).when(csContext).getConf(); + doReturn(Resources.createResource(GB, 1)).when(csContext) + .getMinimumResourceCapability(); + doReturn(Resources.createResource(16 * GB, 12)).when(csContext) + .getMaximumResourceCapability(); + doReturn(Resources.createResource(100 * 16 * GB, 100 * 12)).when(csContext) + .getClusterResource(); + doReturn(CapacityScheduler.applicationComparator).when(csContext) + .getApplicationComparator(); + doReturn(CapacityScheduler.queueComparator).when(csContext) + .getQueueComparator(); + doReturn(resourceCalculator).when(csContext).getResourceCalculator(); + doReturn(rmContext).when(csContext).getRMContext(); + + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); containerTokenSecretManager.rollMasterKey(); - when(csContext.getContainerTokenSecretManager()).thenReturn( - containerTokenSecretManager); + doReturn(containerTokenSecretManager).when(csContext) + .getContainerTokenSecretManager(); root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -171,25 +170,7 @@ static LeafQueue stubLeafQueue(LeafQueue queue) { // Mock some methods for ease in these unit tests - // 1. LeafQueue.createContainer to return dummy containers - doAnswer(new Answer() { - @Override - public Container answer(InvocationOnMock invocation) throws Throwable { - final FiCaSchedulerApp application = (FiCaSchedulerApp) (invocation - .getArguments()[0]); - final ContainerId containerId = TestUtils - .getMockContainerId(application); - - Container container = TestUtils.getMockContainer(containerId, - ((FiCaSchedulerNode) (invocation.getArguments()[1])).getNodeID(), - (Resource) (invocation.getArguments()[2]), - ((Priority) invocation.getArguments()[3])); - return container; - } - }).when(queue).createContainer(any(FiCaSchedulerApp.class), - any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class)); - - // 2. Stub out LeafQueue.parent.completedContainer + // 1. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); doNothing().when(parent).completedContainer(any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), @@ -664,7 +645,7 @@ public void testGetAppToUnreserve() throws Exception { // no reserved containers - reserve then unreserve app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); - app_0.unreserve(node_0, priorityMap); + app_0.unreserveResource(node_0, priorityMap); unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); assertEquals(null, unreserveId); @@ -718,14 +699,14 @@ public void testFindNodeToUnreserve() throws Exception { node_1.getNodeID(), "user", rmContext); // nothing reserved - boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), - node_1, app_0, priorityMap, capability); + boolean res = app_0.findNodeToUnreserve(csContext.getClusterResource(), + node_1, priorityMap, capability); assertFalse(res); // reserved but scheduler doesn't know about that node. app_0.reserve(node_1, priorityMap, rmContainer, container); node_1.reserveResource(app_0, priorityMap, rmContainer); - res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, + res = app_0.findNodeToUnreserve(csContext.getClusterResource(), node_1, priorityMap, capability); assertFalse(res); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 9e352a7..c6e071f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -67,8 +67,16 @@ * Get a mock {@link RMContext} for use in test cases. * @return a mock {@link RMContext} for use in test cases */ - @SuppressWarnings({ "rawtypes", "unchecked" }) public static RMContext getMockRMContext() { + return getMockRMContext(null); + } + + /** + * Get a mock {@link RMContext} for use in test cases. + * @return a mock {@link RMContext} for use in test cases + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static RMContext getMockRMContext(CapacityScheduler cs) { // Null dispatcher Dispatcher nullDispatcher = new Dispatcher() { private final EventHandler handler = @@ -98,7 +106,7 @@ public EventHandler getEventHandler() { new AMRMTokenSecretManager(conf, null), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), writer); + new ClientToAMTokenSecretManagerInRM(), writer, cs); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); when( nlm.getQueueResource(any(String.class), any(Set.class), @@ -162,18 +170,14 @@ public static ResourceRequest createResourceRequest( } public static ApplicationId getMockApplicationId(int appId) { - ApplicationId applicationId = mock(ApplicationId.class); - when(applicationId.getClusterTimestamp()).thenReturn(0L); - when(applicationId.getId()).thenReturn(appId); - return applicationId; + return BuilderUtils.newApplicationId(0L, appId); } - public static ApplicationAttemptId - getMockApplicationAttemptId(int appId, int attemptId) { + public static ApplicationAttemptId getMockApplicationAttemptId(int appId, + int attemptId) { ApplicationId applicationId = BuilderUtils.newApplicationId(0l, appId); - ApplicationAttemptId applicationAttemptId = mock(ApplicationAttemptId.class); - when(applicationAttemptId.getApplicationId()).thenReturn(applicationId); - when(applicationAttemptId.getAttemptId()).thenReturn(attemptId); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, attemptId); return applicationAttemptId; }