Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (revision 1559583) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (working copy) @@ -49,7 +49,7 @@ = new DefaultResourceCalculator(); private FairScheduler scheduler; - private FSSchedulerApp app; + private FSSchedulerAppAttempt app; private Resource demand = Resources.createResource(0); private long startTime; private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -57,7 +57,7 @@ private FSLeafQueue queue; private RMContainerTokenSecretManager containerTokenSecretManager; - public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) { + public AppSchedulable(FairScheduler scheduler, FSSchedulerAppAttempt app, FSLeafQueue queue) { this.scheduler = scheduler; this.app = app; this.startTime = scheduler.getClock().getTime(); @@ -71,7 +71,7 @@ return app.getApplicationId().toString(); } - public FSSchedulerApp getApp() { + public FSSchedulerAppAttempt getApp() { return app; } @@ -143,7 +143,7 @@ * priority. */ public Container createContainer( - FSSchedulerApp application, FSSchedulerNode node, + FSSchedulerAppAttempt application, FSSchedulerNode node, Resource capability, Priority priority) { NodeId nodeId = node.getRMNode().getNodeID(); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (revision 1559583) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (working copy) @@ -61,7 +61,7 @@ this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); } - public void addApp(FSSchedulerApp app, boolean runnable) { + public void addApp(FSSchedulerAppAttempt app, boolean runnable) { AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this); app.setAppSchedulable(appSchedulable); if (runnable) { @@ -80,7 +80,7 @@ * Removes the given app from this queue. * @return whether or not the app was runnable */ - public boolean removeApp(FSSchedulerApp app) { + public boolean removeApp(FSSchedulerAppAttempt app) { if (runnableAppScheds.remove(app.getAppSchedulable())) { return true; } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) { Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (deleted) =================================================================== Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerAppAttempt.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerAppAttempt.java (revision 0) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerAppAttempt.java (working copy) @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Represents an application attempt from the viewpoint of the Fair Scheduler. + */ +@Private +@Unstable +public class FSSchedulerAppAttempt extends SchedulerApplicationAttempt { + + private static final Log LOG = LogFactory.getLog(FSSchedulerAppAttempt.class); + + private AppSchedulable appSchedulable; + + final Map preemptionMap = new HashMap(); + + public FSSchedulerAppAttempt(ApplicationAttemptId applicationAttemptId, + String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext) { + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + } + + public void setAppSchedulable(AppSchedulable appSchedulable) { + this.appSchedulable = appSchedulable; + } + + public AppSchedulable getAppSchedulable() { + return appSchedulable; + } + + synchronized public void containerCompleted(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent( + containerId, + containerStatus, + event) + ); + LOG.info("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + + // Remove from the list of containers + liveContainers.remove(rmContainer.getContainerId()); + + RMAuditLogger.logSuccess(getUser(), + AuditConstants.RELEASE_CONTAINER, "SchedulerApp", + getApplicationId(), containerId); + + // Update usage metrics + Resource containerResource = rmContainer.getContainer().getResource(); + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + Resources.subtractFrom(currentConsumption, containerResource); + + // remove from preemption map if it is completed + preemptionMap.remove(rmContainer); + } + + public synchronized void unreserve(FSSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(priority); + } + + // Reset the re-reservation count + resetReReservations(priority); + + Resource resource = reservedContainer.getContainer().getResource(); + Resources.subtractFrom(currentReservation, resource); + + LOG.info("Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + " at priority " + + priority + "; currentReservation " + currentReservation); + } + + public synchronized float getLocalityWaitFactor( + Priority priority, int clusterNodes) { + // Estimate: Required unique resources (i.e. hosts + racks) + int requiredResources = + Math.max(this.getResourceRequests(priority).size() - 1, 0); + + // waitFactor can't be more than '1' + // i.e. no point skipping more than clustersize opportunities + return Math.min(((float)requiredResources / clusterNodes), 1.0f); + } + + /** + * Delay scheduling: We often want to prioritize scheduling of node-local + * containers over rack-local or off-switch containers. To acheive this + * we first only allow node-local assigments for a given prioirty level, + * then relax the locality threshold once we've had a long enough period + * without succesfully scheduling. We measure both the number of "missed" + * scheduling opportunities since the last container was scheduled + * at the current allowed level and the time since the last container + * was scheduled. Currently we use only the former. + */ + + // Current locality threshold + final Map allowedLocalityLevel = new HashMap< + Priority, NodeType>(); + + /** + * Return the level at which we are allowed to schedule containers, given the + * current size of the cluster and thresholds indicating how many nodes to + * fail at (as a fraction of cluster size) before relaxing scheduling + * constraints. + */ + public synchronized NodeType getAllowedLocalityLevel(Priority priority, + int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { + // upper limit on threshold + if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } + if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } + + // If delay scheduling is not being used, can schedule anywhere + if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { + return NodeType.OFF_SWITCH; + } + + // Default level is NODE_LOCAL + if (!allowedLocalityLevel.containsKey(priority)) { + allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } + + NodeType allowed = allowedLocalityLevel.get(priority); + + // If level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; + + double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold : + rackLocalityThreshold; + + // Relax locality constraints once we've surpassed threshold. + if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(priority); + } + else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(priority); + } + } + return allowedLocalityLevel.get(priority); + } + + /** + * Return the level at which we are allowed to schedule containers. + * Given the thresholds indicating how much time passed before relaxing + * scheduling constraints. + */ + public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, + long nodeLocalityDelayMs, long rackLocalityDelayMs, + long currentTimeMs) { + + // if not being used, can schedule anywhere + if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { + return NodeType.OFF_SWITCH; + } + + // default level is NODE_LOCAL + if (! allowedLocalityLevel.containsKey(priority)) { + allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } + + NodeType allowed = allowedLocalityLevel.get(priority); + + // if level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; + } + + // check waiting time + long waitTime = currentTimeMs; + if (lastScheduledContainer.containsKey(priority)) { + waitTime -= lastScheduledContainer.get(priority); + } else { + waitTime -= appSchedulable.getStartTime(); + } + + long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityDelayMs : rackLocalityDelayMs; + + if (waitTime > thresholdTime) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(priority, currentTimeMs); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(priority, currentTimeMs); + } + } + return allowedLocalityLevel.get(priority); + } + + synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, + Priority priority, ResourceRequest request, + Container container) { + // Update allowed locality level + NodeType allowed = allowedLocalityLevel.get(priority); + if (allowed != null) { + if (allowed.equals(NodeType.OFF_SWITCH) && + (type.equals(NodeType.NODE_LOCAL) || + type.equals(NodeType.RACK_LOCAL))) { + this.resetAllowedLocalityLevel(priority, type); + } + else if (allowed.equals(NodeType.RACK_LOCAL) && + type.equals(NodeType.NODE_LOCAL)) { + this.resetAllowedLocalityLevel(priority, type); + } + } + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(priority) <= 0) { + return null; + } + + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, + getApplicationAttemptId(), node.getNodeID(), rmContext + .getDispatcher().getEventHandler(), rmContext + .getContainerAllocationExpirer()); + + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); + + // Update consumption and track allocations + appSchedulingInfo.allocate(type, node, priority, request, container); + Resources.addTo(currentConsumption, container.getResource()); + + // Inform the container + rmContainer.handle( + new RMContainerEvent(container.getId(), RMContainerEventType.START)); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + + container.getId().getApplicationAttemptId() + + " container=" + container.getId() + " host=" + + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), + AuditConstants.ALLOC_CONTAINER, "SchedulerApp", + getApplicationId(), container.getId()); + + return rmContainer; + } + + /** + * Should be called when the scheduler assigns a container at a higher + * degree of locality than the current threshold. Reset the allowed locality + * level to a higher degree of locality. + */ + public synchronized void resetAllowedLocalityLevel(Priority priority, + NodeType level) { + NodeType old = allowedLocalityLevel.get(priority); + LOG.info("Raising locality level from " + old + " to " + level + " at " + + " priority " + priority); + allowedLocalityLevel.put(priority, level); + } + + // related methods + public void addPreemption(RMContainer container, long time) { + assert preemptionMap.get(container) == null; + preemptionMap.put(container, time); + } + + public Long getContainerPreemptionTime(RMContainer container) { + return preemptionMap.get(container); + } + + public Set getPreemptionContainers() { + return preemptionMap.keySet(); + } + + @Override + public FSLeafQueue getQueue() { + return (FSLeafQueue)super.getQueue(); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (revision 1559583) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (working copy) @@ -209,7 +209,7 @@ } public synchronized void reserveResource( - FSSchedulerApp application, Priority priority, + FSSchedulerAppAttempt application, Priority priority, RMContainer reservedContainer) { // Check if it's already reserved if (this.reservedContainer != null) { @@ -245,7 +245,7 @@ } public synchronized void unreserveResource( - FSSchedulerApp application) { + FSSchedulerAppAttempt application) { // Cannot unreserve for wrong application... ApplicationAttemptId reservedApplication = reservedContainer.getContainer().getId().getApplicationAttemptId(); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (revision 1559583) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (working copy) @@ -254,17 +254,17 @@ @Override public RMContainer getRMContainer(ContainerId containerId) { - FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId); + FSSchedulerAppAttempt attempt = getCurrentAttemptForContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId); } - private FSSchedulerApp getCurrentAttemptForContainer( + private FSSchedulerAppAttempt getCurrentAttemptForContainer( ContainerId containerId) { SchedulerApplication app = applications.get(containerId.getApplicationAttemptId() .getApplicationId()); if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); + return (FSSchedulerAppAttempt) app.getCurrentAppAttempt(); } return null; } @@ -387,8 +387,8 @@ return; } - Map apps = - new HashMap(); + Map apps = + new HashMap(); Map queues = new HashMap(); @@ -456,7 +456,7 @@ } } - private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, + private void warnOrKillContainer(RMContainer container, FSSchedulerAppAttempt app, FSLeafQueue queue) { LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + @@ -480,7 +480,7 @@ (clock.getTime() - time) + "ms)"); } } else { - // track the request in the FSSchedulerApp itself + // track the request in the FSSchedulerAppAttempt itself app.addPreemption(container, clock.getTime()); } } @@ -659,8 +659,8 @@ String user = application.getUser(); FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - FSSchedulerApp attempt = - new FSSchedulerApp(applicationAttemptId, user, + FSSchedulerAppAttempt attempt = + new FSSchedulerAppAttempt(applicationAttemptId, user, queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); if (transferStateFromPreviousAttempt) { @@ -727,7 +727,7 @@ " finalState=" + rmAppAttemptFinalState); SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); - FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); + FSSchedulerAppAttempt attempt = getSchedulerApp(applicationAttemptId); if (attempt == null || application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -786,7 +786,7 @@ Container container = rmContainer.getContainer(); // Get the application for the finished container - FSSchedulerApp application = + FSSchedulerAppAttempt application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = container.getId().getApplicationAttemptId().getApplicationId(); @@ -862,7 +862,7 @@ List ask, List release, List blacklistAdditions, List blacklistRemovals) { // Make sure this application exists - FSSchedulerApp application = getSchedulerApp(appAttemptId); + FSSchedulerAppAttempt application = getSchedulerApp(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -932,7 +932,7 @@ */ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { // Get the application for the finished container - FSSchedulerApp application = getCurrentAttemptForContainer(containerId); + FSSchedulerAppAttempt application = getCurrentAttemptForContainer(containerId); if (application == null) { LOG.info("Unknown application " + containerId.getApplicationAttemptId().getApplicationId() @@ -1074,11 +1074,11 @@ return node == null ? null : new SchedulerNodeReport(node); } - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { + public FSSchedulerAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { SchedulerApplication app = applications.get(appAttemptId.getApplicationId()); if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); + return (FSSchedulerAppAttempt) app.getCurrentAppAttempt(); } return null; } @@ -1086,7 +1086,7 @@ @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); + FSSchedulerAppAttempt attempt = getSchedulerApp(appAttemptId); if (attempt == null) { LOG.error("Request for appInfo of unknown attempt" + appAttemptId); return null; @@ -1097,7 +1097,7 @@ @Override public ApplicationResourceUsageReport getAppResourceUsageReport( ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); + FSSchedulerAppAttempt attempt = getSchedulerApp(appAttemptId); if (attempt == null) { LOG.error("Request for appInfo of unknown attempt" + appAttemptId); return null; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (revision 1559583) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (working copy) @@ -75,7 +75,7 @@ * Tracks the given new runnable app for purposes of maintaining max running * app limits. */ - public void trackRunnableApp(FSSchedulerApp app) { + public void trackRunnableApp(FSSchedulerAppAttempt app) { String user = app.getUser(); FSLeafQueue queue = app.getQueue(); // Increment running counts for all parent queues @@ -94,7 +94,7 @@ * Tracks the given new non runnable app so that it can be made runnable when * it would not violate max running app limits. */ - public void trackNonRunnableApp(FSSchedulerApp app) { + public void trackNonRunnableApp(FSSchedulerAppAttempt app) { String user = app.getUser(); usersNonRunnableApps.put(user, app.getAppSchedulable()); } @@ -107,7 +107,7 @@ * Runs in O(n log(n)) where n is the number of queues that are under the * highest queue that went from having no slack to having slack. */ - public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) { + public void updateRunnabilityOnAppRemoval(FSSchedulerAppAttempt app) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); // Update usersRunnableApps @@ -160,12 +160,12 @@ } // Scan through and check whether this means that any apps are now runnable - Iterator iter = new MultiListStartTimeIterator( + Iterator iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); - FSSchedulerApp prev = null; + FSSchedulerAppAttempt prev = null; int numNowRunnable = 0; while (iter.hasNext()) { - FSSchedulerApp next = iter.next(); + FSSchedulerAppAttempt next = iter.next(); if (next == prev) { continue; } @@ -193,7 +193,7 @@ /** * Stops tracking the given non-runnable app */ - public void untrackNonRunnableApp(FSSchedulerApp app) { + public void untrackNonRunnableApp(FSSchedulerAppAttempt app) { usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); } @@ -226,7 +226,7 @@ * of O(num lists) time. */ private static class MultiListStartTimeIterator implements - Iterator { + Iterator { private List[] appLists; private int[] curPositionsInAppLists; @@ -251,7 +251,7 @@ } @Override - public FSSchedulerApp next() { + public FSSchedulerAppAttempt next() { IndexAndTime indexAndTime = appListsByCurStartTime.remove(); int nextListIndex = indexAndTime.index; AppSchedulable next = appLists[nextListIndex] Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java (deleted) =================================================================== Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerAppAttempt.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerAppAttempt.java (revision 0) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerAppAttempt.java (working copy) @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFSSchedulerAppAttempt { + + private class MockClock implements Clock { + private long time = 0; + @Override + public long getTime() { + return time; + } + + public void tick(int seconds) { + time = time + seconds * 1000; + } + + } + + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { + ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); + ApplicationAttemptId attId = + ApplicationAttemptId.newInstance(appIdImpl, attemptId); + return attId; + } + + @Test + public void testDelayScheduling() { + FSLeafQueue queue = Mockito.mock(FSLeafQueue.class); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + double nodeLocalityThreshold = .5; + double rackLocalityThreshold = .6; + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + FSSchedulerAppAttempt schedulerApp = + new FSSchedulerAppAttempt(applicationAttemptId, "user1", queue , null, null); + + // Default level should be node-local + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + // First five scheduling opportunities should remain node local + for (int i = 0; i < 5; i++) { + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + // After five it should switch to rack local + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + // Manually set back to node local + schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL); + schedulerApp.resetSchedulingOpportunities(prio); + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + // Now escalate again to rack-local, then to off-switch + for (int i = 0; i < 5; i++) { + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + for (int i = 0; i < 6; i++) { + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + @Test + public void testDelaySchedulingForContinuousScheduling() + throws InterruptedException { + FSLeafQueue queue = Mockito.mock(FSLeafQueue.class); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + + MockClock clock = new MockClock(); + + long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds + long rackLocalityDelayMs = 6 * 1000L; // 6 seconds + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + FSSchedulerAppAttempt schedulerApp = + new FSSchedulerAppAttempt(applicationAttemptId, "user1", queue, + null, null); + AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class); + long startTime = clock.getTime(); + Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime); + schedulerApp.setAppSchedulable(appSchedulable); + + // Default level should be node-local + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // after 4 seconds should remain node local + clock.tick(4); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // after 6 seconds should switch to rack local + clock.tick(2); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // manually set back to node local + schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL); + schedulerApp.resetSchedulingOpportunities(prio, clock.getTime()); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // Now escalate again to rack-local, then to off-switch + clock.tick(6); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + clock.tick(7); + assertEquals(NodeType.OFF_SWITCH, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + } + + @Test + /** + * Ensure that when negative paramaters are given (signaling delay scheduling + * no tin use), the least restrictive locality level is returned. + */ + public void testLocalityLevelWithoutDelays() { + FSLeafQueue queue = Mockito.mock(FSLeafQueue.class); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + FSSchedulerAppAttempt schedulerApp = + new FSSchedulerAppAttempt(applicationAttemptId, "user1", queue , null, null); + assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( + prio, 10, -1.0, -1.0)); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (revision 1559583) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (working copy) @@ -1447,7 +1447,7 @@ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); ContainerId containerId = scheduler.getSchedulerApp(attId) @@ -1519,9 +1519,9 @@ ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSSchedulerAppAttempt app1 = scheduler.getSchedulerApp(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSSchedulerAppAttempt app2 = scheduler.getSchedulerApp(attId2); assertNull("The application was allowed", app2); } @@ -1590,8 +1590,8 @@ "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSSchedulerAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSSchedulerAppAttempt app2 = scheduler.getSchedulerApp(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1631,7 +1631,7 @@ ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1693,10 +1693,10 @@ ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSSchedulerAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSSchedulerAppAttempt app2 = scheduler.getSchedulerApp(attId2); + FSSchedulerAppAttempt app3 = scheduler.getSchedulerApp(attId3); + FSSchedulerAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1811,7 +1811,7 @@ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -1880,7 +1880,7 @@ NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1919,7 +1919,7 @@ NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1952,7 +1952,7 @@ ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -1992,7 +1992,7 @@ ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2012,10 +2012,10 @@ ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSSchedulerAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSSchedulerAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2053,13 +2053,13 @@ ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSSchedulerAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSSchedulerAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSSchedulerAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2090,19 +2090,19 @@ ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSSchedulerAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSSchedulerAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSSchedulerAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); + FSSchedulerAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2182,7 +2182,7 @@ NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2194,7 +2194,7 @@ } private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(attId); FSLeafQueue queue = app.getQueue(); Collection runnableApps = queue.getRunnableAppSchedulables(); @@ -2379,7 +2379,7 @@ // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); + FSSchedulerAppAttempt app = fs.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -2467,7 +2467,7 @@ ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSSchedulerAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.emptyList(), Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java (revision 1559583) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java (working copy) @@ -57,11 +57,11 @@ appNum = 0; } - private FSSchedulerApp addApp(FSLeafQueue queue, String user) { + private FSSchedulerAppAttempt addApp(FSLeafQueue queue, String user) { ApplicationId appId = ApplicationId.newInstance(0l, appNum++); ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); - FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null); + FSSchedulerAppAttempt app = new FSSchedulerAppAttempt(attId, user, queue, null, null); queue.addApp(app, runnable); if (runnable) { maxAppsEnforcer.trackRunnableApp(app); @@ -71,7 +71,7 @@ return app; } - private void removeApp(FSSchedulerApp app) { + private void removeApp(FSSchedulerAppAttempt app) { app.getQueue().removeApp(app); maxAppsEnforcer.updateRunnabilityOnAppRemoval(app); } @@ -83,7 +83,7 @@ queueMaxApps.put("root", 2); queueMaxApps.put("root.queue1", 1); queueMaxApps.put("root.queue2", 1); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSSchedulerAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -100,7 +100,7 @@ FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSSchedulerAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -118,7 +118,7 @@ FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true); queueMaxApps.put("root.queue1.leaf1", 2); userMaxApps.put("user1", 1); - FSSchedulerApp app1 = addApp(leaf1, "user1"); + FSSchedulerAppAttempt app1 = addApp(leaf1, "user1"); addApp(leaf1, "user2"); addApp(leaf1, "user3"); addApp(leaf2, "user1"); @@ -137,7 +137,7 @@ FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSSchedulerAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); clock.tick(20);