diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index f923733..046b213 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -36,6 +36,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -872,4 +874,11 @@ public String moveApplication(ApplicationId appId, String newQueue) throws YarnException { return scheduler.moveApplication(appId, newQueue); } + + @Override + @LimitedPrivate("yarn") + @Unstable + public Resource getClusterResource() { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 7ea73d9..3c0acf3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -169,7 +169,7 @@ public void init(Configuration config, public void editSchedule(){ CSQueue root = scheduler.getRootQueue(); Resource clusterResources = - Resources.clone(scheduler.getClusterResources()); + Resources.clone(scheduler.getClusterResource()); containerBasedPreemptOrKill(root, clusterResources); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0f3af41..41e321b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -22,11 +22,17 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -35,6 +41,18 @@ public abstract class AbstractYarnScheduler implements ResourceScheduler { + private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); + + // Nodes in the cluster, indexed by NodeId + protected Map nodes = + new ConcurrentHashMap(); + + // Whole capacity of the cluster + protected Resource clusterResource = Resource.newInstance(0, 0); + + protected Resource minimumAllocation; + protected Resource maximumAllocation; + protected RMContext rmContext; protected Map applications; protected final static List EMPTY_CONTAINER_LIST = @@ -67,8 +85,81 @@ public Map getSchedulerApplications() { return applications; } + + @Override + public Resource getClusterResource() { + return clusterResource; + } + + @Override + public Resource getMinimumResourceCapability() { + return minimumAllocation; + } + + @Override + public Resource getMaximumResourceCapability() { + return maximumAllocation; + } + + public SchedulerApplicationAttempt getApplicationAttempt( + ApplicationAttemptId applicationAttemptId) { + SchedulerApplication app = + applications.get(applicationAttemptId.getApplicationId()); + return app == null ? null : app.getCurrentAppAttempt(); + } + + @Override + public SchedulerAppReport getSchedulerAppInfo( + ApplicationAttemptId appAttemptId) { + SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); + if (attempt == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); + } + return null; + } + return new SchedulerAppReport(attempt); + } @Override + public ApplicationResourceUsageReport getAppResourceUsageReport( + ApplicationAttemptId appAttemptId) { + SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); + if (attempt == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); + } + return null; + } + return attempt.getResourceUsageReport(); + } + + + protected SchedulerApplicationAttempt getCurrentAttemptForContainer( + ContainerId containerId) { + SchedulerApplication app = + applications.get(containerId.getApplicationAttemptId() + .getApplicationId()); + if (app != null) { + return (SchedulerApplicationAttempt) app.getCurrentAppAttempt(); + } + return null; + } + + @Override + public RMContainer getRMContainer(ContainerId containerId) { + SchedulerApplicationAttempt attempt = + getCurrentAttemptForContainer(containerId); + return (attempt == null) ? null : attempt.getRMContainer(containerId); + } + + @Override + public SchedulerNodeReport getNodeReport(NodeId nodeId) { + SchedulerNode node = nodes.get(nodeId); + return node == null ? null : new SchedulerNodeReport(node); + } + + @Override public String moveApplication(ApplicationId appId, String newQueue) throws YarnException { throw new YarnException(getClass().getSimpleName() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 524b1ab..d4374c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -18,72 +18,246 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.util.resource.Resources; -/** - * Represents a YARN Cluster Node from the viewpoint of the scheduler. - */ -@Private -@Unstable public abstract class SchedulerNode { + private static final Log LOG = LogFactory.getLog(SchedulerNode.class); + + private Resource availableResource = Resource.newInstance(0, 0); + private Resource usedResource = Resource.newInstance(0, 0); + private Resource totalResourceCapability; + protected RMContainer reservedContainer; + private volatile int numContainers; + + + /* set of containers that are allocated containers */ + private final Map launchedContainers = + new HashMap(); + + private final RMNode rmNode; + private final String nodeName; + + public SchedulerNode(RMNode node, boolean usePortForNodeName) { + this.rmNode = node; + this.availableResource = Resources.clone(node.getTotalCapability()); + this.totalResourceCapability = Resources.clone(node.getTotalCapability()); + if (usePortForNodeName) { + nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); + } else { + nodeName = rmNode.getHostName(); + } + } + + public RMNode getRMNode() { + return this.rmNode; + } + + /** + * Get the ID of the node which contains both its hostname and port. + * + * @return the ID of the node + */ + public NodeId getNodeID() { + return this.rmNode.getNodeID(); + } + + public String getHttpAddress() { + return this.rmNode.getHttpAddress(); + } + /** * Get the name of the node for scheduling matching decisions. *

- * Typically this is the 'hostname' reported by the node, but it could be - * configured to be 'hostname:port' reported by the node via the + * Typically this is the 'hostname' reported by the node, but it could be + * configured to be 'hostname:port' reported by the node via the * {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant. * The main usecase of this is Yarn minicluster to be able to differentiate * node manager instances by their port number. * * @return name of the node for scheduling matching decisions. */ - public abstract String getNodeName(); - + public String getNodeName() { + return nodeName; + } + /** * Get rackname. + * * @return rackname */ - public abstract String getRackName(); - + public String getRackName() { + return this.rmNode.getRackName(); + } + /** - * Get used resources on the node. - * @return used resources on the node + * The Scheduler has allocated containers on this node to the given + * application. + * + * @param applicationId + * application + * @param rmContainer + * allocated container */ - public abstract Resource getUsedResource(); + public synchronized void allocateContainer(ApplicationId applicationId, + RMContainer rmContainer) { + Container container = rmContainer.getContainer(); + deductAvailableResource(container.getResource()); + ++numContainers; + + launchedContainers.put(container.getId(), rmContainer); + + LOG.info("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available"); + } /** * Get available resources on the node. + * * @return available resources on the node */ - public abstract Resource getAvailableResource(); + public synchronized Resource getAvailableResource() { + return this.availableResource; + } /** - * Get number of active containers on the node. - * @return number of active containers on the node - */ - public abstract int getNumContainers(); - - /** - * Apply delta resource on node's available resource. - * @param deltaResource the delta of resource need to apply to node + * Get used resources on the node. + * + * @return used resources on the node */ - public abstract void applyDeltaOnAvailableResource(Resource deltaResource); + public synchronized Resource getUsedResource() { + return this.usedResource; + } /** * Get total resources on the node. + * * @return total resources on the node. */ - public abstract Resource getTotalResource(); - + public Resource getTotalResource() { + return this.totalResourceCapability; + } + + private synchronized boolean isValidContainer(Container c) { + if (launchedContainers.containsKey(c.getId())) + return true; + return false; + } + + private synchronized void updateResource(Container container) { + addAvailableResource(container.getResource()); + --numContainers; + } + /** - * Get the ID of the node which contains both its hostname and port. - * @return the ID of the node + * Release an allocated container on this node. + * + * @param container + * container to be released + */ + public synchronized void releaseContainer(Container container) { + if (!isValidContainer(container)) { + LOG.error("Invalid container released " + container); + return; + } + + /* remove the containers from the nodemanger */ + if (null != launchedContainers.remove(container.getId())) { + updateResource(container); + } + + LOG.info("Released container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available" + ", release resources=" + true); + } + + private synchronized void addAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid resource addition of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.addTo(availableResource, resource); + Resources.subtractFrom(usedResource, resource); + } + + private synchronized void deductAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.subtractFrom(availableResource, resource); + Resources.addTo(usedResource, resource); + } + + /** + * Reserve container for the attempt on this node. */ - public abstract NodeId getNodeID(); + public abstract void reserveResource(SchedulerApplicationAttempt attempt, + Priority priority, RMContainer reservedContainer); + /** + * Unreserve resources on this node. + */ + public abstract void + unreserveResource(SchedulerApplicationAttempt attempt); + + @Override + public String toString() { + return "host: " + rmNode.getNodeAddress() + " #containers=" + + getNumContainers() + " available=" + + getAvailableResource().getMemory() + " used=" + + getUsedResource().getMemory(); + } + + /** + * Get number of active containers on the node. + * + * @return number of active containers on the node + */ + public int getNumContainers() { + return numContainers; + } + + public synchronized List getRunningContainers() { + return new ArrayList(launchedContainers.values()); + } + + public synchronized RMContainer getReservedContainer() { + return reservedContainer; + } + + /** + * Apply delta resource on node's available resource. + * + * @param deltaResource + * the delta of resource need to apply to node + */ + public synchronized void + applyDeltaOnAvailableResource(Resource deltaResource) { + // we can only adjust available resource if total resource is changed. + Resources.addTo(this.availableResource, deltaResource); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 2348603..21eba39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -69,7 +69,15 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, @Public @Stable public List getQueueUserAclInfo(); - + + /** + * Get the whole resource capacity of the cluster. + * @return the whole resource capacity of the cluster. + */ + @LimitedPrivate("yarn") + @Unstable + public Resource getClusterResource(); + /** * Get minimum allocatable {@link Resource}. * @return minimum allocatable resource @@ -182,7 +190,7 @@ boolean checkAccess(UserGroupInformation callerUGI, @LimitedPrivate("yarn") @Unstable public RMContainer getRMContainer(ContainerId containerId); - + /** * Moves the given application to the given queue * @param appId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e28c18c..9dd3009 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -49,11 +48,9 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -75,9 +72,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -182,16 +178,8 @@ public Configuration getConf() { private Map queues = new ConcurrentHashMap(); - private Map nodes = - new ConcurrentHashMap(); - - private Resource clusterResource = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); private int numNodeManagers = 0; - private Resource minimumAllocation; - private Resource maximumAllocation; - private boolean initialized = false; private ResourceCalculator calculator; @@ -231,16 +219,6 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { } @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; - } - - @Override public Comparator getApplicationComparator() { return applicationComparator; } @@ -264,11 +242,6 @@ public synchronized int getNumClusterNodes() { public RMContext getRMContext() { return this.rmContext; } - - @Override - public Resource getClusterResources() { - return clusterResource; - } @Override public synchronized void @@ -331,16 +304,16 @@ long getAsyncScheduleInterval() { static void schedule(CapacityScheduler cs) { // First randomize the start point int current = 0; - Collection nodes = cs.getAllNodes().values(); + Collection nodes = cs.getAllNodes().values(); int start = random.nextInt(nodes.size()); - for (FiCaSchedulerNode node : nodes) { + for (SchedulerNode node : nodes) { if (current++ >= start) { - cs.allocateContainersToNode(node); + cs.allocateContainersToNode((FiCaSchedulerNode)node); } } // Now, just get everyone to be safe - for (FiCaSchedulerNode node : nodes) { - cs.allocateContainersToNode(node); + for (SchedulerNode node : nodes) { + cs.allocateContainersToNode((FiCaSchedulerNode)node); } try { Thread.sleep(cs.getAsyncScheduleInterval()); @@ -659,7 +632,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( - ask, getResourceCalculator(), getClusterResources(), + ask, getResourceCalculator(), getClusterResource(), getMinimumResourceCapability(), maximumAllocation); // Release containers @@ -822,7 +795,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (Resources.greaterThanOrEqual(calculator, getClusterResources(), + if (Resources.greaterThanOrEqual(calculator, getClusterResource(), node.getAvailableResource(), minimumAllocation)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + @@ -942,7 +915,7 @@ private synchronized void addNode(RMNode nodeManager) { } private synchronized void removeNode(RMNode nodeInfo) { - FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); + FiCaSchedulerNode node = (FiCaSchedulerNode)nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } @@ -1015,56 +988,27 @@ private synchronized void completedContainer(RMContainer rmContainer, @Lock(Lock.NoLock.class) @VisibleForTesting - public FiCaSchedulerApp getApplicationAttempt( - ApplicationAttemptId applicationAttemptId) { - SchedulerApplication app = - applications.get(applicationAttemptId.getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : new SchedulerAppReport(app); - } - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( + public FiCaSchedulerApp getApplicationAttempt( ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : app.getResourceUsageReport(); + return (FiCaSchedulerApp) super.getApplicationAttempt(applicationAttemptId); } @Lock(Lock.NoLock.class) FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return (FiCaSchedulerNode)nodes.get(nodeId); } @Lock(Lock.NoLock.class) - Map getAllNodes() { + Map getAllNodes() { return nodes; } - - @Override - public RMContainer getRMContainer(ContainerId containerId) { - FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); - return (attempt == null) ? null : attempt.getRMContainer(containerId); - } @VisibleForTesting + @Override public FiCaSchedulerApp getCurrentAttemptForContainer( ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; + return (FiCaSchedulerApp) super.getCurrentAttemptForContainer(containerId); } @Override @@ -1074,12 +1018,6 @@ public void recover(RMState state) throws Exception { } @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FiCaSchedulerNode node = getNode(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - @Override public void dropContainerReservation(RMContainer container) { if(LOG.isDebugEnabled()){ LOG.debug("DROP_RESERVATION:" + container.toString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 3dd4c6d..a3dbc35 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -43,7 +43,7 @@ RMContext getRMContext(); - Resource getClusterResources(); + Resource getClusterResource(); /** * Get the yarn configuration. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 971edb8..5ddb9a4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -174,12 +174,12 @@ public LeafQueue(CapacitySchedulerContext cs, int maxActiveApplications = CSQueueUtils.computeMaxActiveApplications( resourceCalculator, - cs.getClusterResources(), this.minimumAllocation, + cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteMaxCapacity); this.maxActiveAppsUsingAbsCap = CSQueueUtils.computeMaxActiveApplications( resourceCalculator, - cs.getClusterResources(), this.minimumAllocation, + cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteCapacity); int maxActiveApplicationsPerUser = CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, @@ -195,7 +195,7 @@ public LeafQueue(CapacitySchedulerContext cs, cs.getConfiguration().getAcls(getQueuePath()); setupQueueConfigs( - cs.getClusterResources(), + cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 1f09475..dba92a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -143,7 +143,7 @@ public ParentQueue(CapacitySchedulerContext cs, this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList()); - setupQueueConfigs(cs.getClusterResources(), + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, state, acls); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 58f12d0..3133c6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,216 +18,48 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.util.resource.Resources; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource = recordFactory.newRecordInstance(Resource.class); - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - private Resource totalResourceCapability; - - private volatile int numContainers; - - private RMContainer reservedContainer; - - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap(); - - private final RMNode rmNode; - private final String nodeName; - public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { - this.rmNode = node; - this.availableResource.setMemory(node.getTotalCapability().getMemory()); - this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores()); - totalResourceCapability = - Resource.newInstance(node.getTotalCapability().getMemory(), node - .getTotalCapability().getVirtualCores()); - if (usePortForNodeName) { - nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); - } else { - nodeName = rmNode.getHostName(); - } - } - - public RMNode getRMNode() { - return this.rmNode; - } - - public NodeId getNodeID() { - return this.rmNode.getNodeID(); - } - - public String getHttpAddress() { - return this.rmNode.getHttpAddress(); + super(node, usePortForNodeName); } @Override - public String getNodeName() { - return nodeName; - } - - @Override - public String getRackName() { - return this.rmNode.getRackName(); - } - - /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container - */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - @Override - public synchronized Resource getAvailableResource() { - return this.availableResource; - } - - @Override - public synchronized Resource getUsedResource() { - return this.usedResource; - } - - @Override - public Resource getTotalResource() { - return this.totalResourceCapability; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) - return true; - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } - - /** - * Release an allocated container on this node. - * @param container container to be released - */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } - - /* remove the containers from the nodemanger */ - if (null != launchedContainers.remove(container.getId())) { - updateResource(container); - } - - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); - } - - @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource().getMemory() + - " used=" + getUsedResource().getMemory(); - } - - @Override - public int getNumContainers() { - return numContainers; - } - - public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); - } - public synchronized void reserveResource( SchedulerApplicationAttempt application, Priority priority, RMContainer reservedContainer) { // Check if it's already reserved - if (this.reservedContainer != null) { + if (super.reservedContainer != null) { // Sanity check if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { throw new IllegalStateException("Trying to reserve" + " container " + reservedContainer + " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); + " when currently" + " reserved resource " + super.reservedContainer + + " on node " + super.reservedContainer.getReservedNode()); } // Cannot reserve more than one application attempt on a given node! // Reservation is still against attempt. - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( + if (!super.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( reservedContainer.getContainer().getId().getApplicationAttemptId())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + + " container " + reservedContainer + " for application " + application.getApplicationAttemptId() + " when currently" + - " reserved container " + this.reservedContainer + + " reserved container " + super.reservedContainer + " on node " + this); } @@ -245,9 +77,10 @@ public synchronized void reserveResource( + application.getApplicationAttemptId()); } } - this.reservedContainer = reservedContainer; + super.reservedContainer = reservedContainer; } + @Override public synchronized void unreserveResource( SchedulerApplicationAttempt application) { @@ -271,15 +104,4 @@ public synchronized void unreserveResource( } reservedContainer = null; } - - public synchronized RMContainer getReservedContainer() { - return reservedContainer; - } - - @Override - public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { - // we can only adjust available resource if total resource is changed. - Resources.addTo(this.availableResource, deltaResource); - } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index c265fcf..1e94046 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -119,9 +119,9 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { // TODO: we might change these queue metrics around a little bit // to match the semantics of the fair scheduler. queueInfo.setCapacity((float) getFairShare().getMemory() / - scheduler.getClusterCapacity().getMemory()); + scheduler.getClusterResource().getMemory()); queueInfo.setCapacity((float) getResourceUsage().getMemory() / - scheduler.getClusterCapacity().getMemory()); + scheduler.getClusterResource().getMemory()); ArrayList childQueueInfos = new ArrayList(); if (includeChildQueues) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 97ea6d4..6efc8ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -18,28 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable @@ -47,189 +35,35 @@ private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource; - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - private Resource totalResourceCapability; - - private volatile int numContainers; - - private RMContainer reservedContainer; private AppSchedulable reservedAppSchedulable; - - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap(); - - private final RMNode rmNode; - private final String nodeName; public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { - this.rmNode = node; - this.availableResource = Resources.clone(node.getTotalCapability()); - totalResourceCapability = - Resource.newInstance(node.getTotalCapability().getMemory(), node - .getTotalCapability().getVirtualCores()); - if (usePortForNodeName) { - nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); - } else { - nodeName = rmNode.getHostName(); - } - } - - public RMNode getRMNode() { - return rmNode; - } - - public NodeId getNodeID() { - return rmNode.getNodeID(); - } - - public String getHttpAddress() { - return rmNode.getHttpAddress(); - } - - @Override - public String getNodeName() { - return nodeName; + super(node, usePortForNodeName); } @Override - public String getRackName() { - return rmNode.getRackName(); - } - - /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container - */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - @Override - public synchronized Resource getAvailableResource() { - return availableResource; - } - - @Override - public synchronized Resource getUsedResource() { - return usedResource; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) { - return true; - } - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } - - /** - * Release an allocated container on this node. - * @param container container to be released - */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } - - /* remove the containers from the nodemanger */ - launchedContainers.remove(container.getId()); - updateResource(container); - - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - @Override - public Resource getTotalResource() { - return this.totalResourceCapability; - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); - } - - @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource() + - " used=" + getUsedResource(); - } - - @Override - public int getNumContainers() { - return numContainers; - } - - public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); - } - public synchronized void reserveResource( - FSSchedulerApp application, Priority priority, + SchedulerApplicationAttempt application, Priority priority, RMContainer reservedContainer) { // Check if it's already reserved - if (this.reservedContainer != null) { + if (super.reservedContainer != null) { // Sanity check if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { throw new IllegalStateException("Trying to reserve" + " container " + reservedContainer + " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); + " when currently" + " reserved resource " + super.reservedContainer + + " on node " + super.reservedContainer.getReservedNode()); } // Cannot reserve more than one application on a given node! - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( + if (!super.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( reservedContainer.getContainer().getId().getApplicationAttemptId())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + + " container " + reservedContainer + " for application " + application.getApplicationId() + " when currently" + - " reserved container " + this.reservedContainer + + " reserved container " + super.reservedContainer + " on node " + this); } @@ -240,12 +74,14 @@ public synchronized void reserveResource( LOG.info("Reserved container " + reservedContainer.getContainer().getId() + " on node " + this + " for application " + application); } - this.reservedContainer = reservedContainer; - this.reservedAppSchedulable = application.getAppSchedulable(); + super.reservedContainer = reservedContainer; + this.reservedAppSchedulable = + ((FSSchedulerApp) application).getAppSchedulable(); } + @Override public synchronized void unreserveResource( - FSSchedulerApp application) { + SchedulerApplicationAttempt application) { // Cannot unreserve for wrong application... ApplicationAttemptId reservedApplication = reservedContainer.getContainer().getId().getApplicationAttemptId(); @@ -258,22 +94,11 @@ public synchronized void unreserveResource( " on node " + this); } - this.reservedContainer = null; + super.reservedContainer = null; this.reservedAppSchedulable = null; } - public synchronized RMContainer getReservedContainer() { - return reservedContainer; - } - public synchronized AppSchedulable getReservedAppSchedulable() { return reservedAppSchedulable; } - - @Override - public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { - // we can only adjust available resource if total resource is changed. - Resources.addTo(this.availableResource, deltaResource); - } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..bb11fc9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -53,7 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -76,10 +74,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -125,8 +121,7 @@ public class FairScheduler extends AbstractYarnScheduler { private boolean initialized; private FairSchedulerConfiguration conf; - private Resource minimumAllocation; - private Resource maximumAllocation; + private Resource incrAllocation; private QueueManager queueMgr; private Clock clock; @@ -152,14 +147,6 @@ // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // Nodes in the cluster, indexed by NodeId - private Map nodes = - new ConcurrentHashMap(); - - // Aggregate capacity of the cluster - private Resource clusterCapacity = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - // How often tasks are preempted protected long preemptionInterval; @@ -247,20 +234,8 @@ public QueueManager getQueueManager() { } @Override - public RMContainer getRMContainer(ContainerId containerId) { - FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId); - return (attempt == null) ? null : attempt.getRMContainer(containerId); - } - - private FSSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; + protected FSSchedulerApp getCurrentAttemptForContainer(ContainerId containerId) { + return (FSSchedulerApp) super.getCurrentAttemptForContainer(containerId); } /** @@ -294,7 +269,7 @@ protected synchronized void update() { // Recursively update demands for all queues rootQueue.updateDemand(); - rootQueue.setFairShare(clusterCapacity); + rootQueue.setFairShare(clusterResource); // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); @@ -322,9 +297,9 @@ private void updatePreemptionVariables() { * Is a queue below its min share for the given task type? */ boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredShare); } @@ -333,9 +308,9 @@ boolean isStarvedForMinShare(FSLeafQueue sched) { * defined as being below half its fair share. */ boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); } @@ -362,7 +337,7 @@ protected synchronized void preemptTasksIfNecessary() { for (FSLeafQueue sched : queueMgr.getLeafQueues()) { resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); } - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { preemptResources(queueMgr.getLeafQueues(), resToPreempt); } @@ -389,7 +364,7 @@ protected void preemptResources(Collection scheds, // Collect running containers from over-scheduled queues List runningContainers = new ArrayList(); for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), sched.getFairShare())) { for (AppSchedulable as : sched.getRunnableAppSchedulables()) { for (RMContainer c : as.getApp().getLiveContainers()) { @@ -421,7 +396,7 @@ public int compare(RMContainer c1, RMContainer c2) { while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); if (container.getState() == RMContainerState.RUNNING && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { warnOrKillContainer(container, apps.get(container), queues.get(container)); preemptedThisRound.add(container); @@ -435,12 +410,12 @@ public int compare(RMContainer c1, RMContainer c2) { // sure we don't preempt too many from any queue Iterator runningIter = runningContainers.iterator(); while (runningIter.hasNext() && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { RMContainer container = runningIter.next(); FSLeafQueue sched = queues.get(container); if (!preemptedThisRound.contains(container) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), sched.getFairShare())) { warnOrKillContainer(container, apps.get(container), sched); @@ -496,20 +471,20 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare @@ -540,18 +515,12 @@ public synchronized ResourceWeights getAppWeight(AppSchedulable app) { return resourceWeights; } - @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - public Resource getIncrementResourceCapability() { return incrAllocation; } - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; + private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) { + return (FSSchedulerNode)nodes.get(nodeId); } public double getNodeLocalityThreshold() { @@ -578,10 +547,6 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Resource getClusterCapacity() { - return clusterCapacity; - } - public synchronized Clock getClock() { return clock; } @@ -809,7 +774,7 @@ private synchronized void completedContainer(RMContainer rmContainer, } // Get the node on which the container was allocated - FSSchedulerNode node = nodes.get(container.getNodeId()); + FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { application.unreserve(node, rmContainer.getReservedPriority()); @@ -827,20 +792,20 @@ private synchronized void completedContainer(RMContainer rmContainer, private synchronized void addNode(RMNode node) { nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); - Resources.addTo(clusterCapacity, node.getTotalCapability()); + Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); LOG.info("Added node " + node.getNodeAddress() + - " cluster capacity: " + clusterCapacity); + " cluster capacity: " + clusterResource); } private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = nodes.get(rmNode.getNodeID()); + FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID()); // This can occur when an UNHEALTHY node reconnects if (node == null) { return; } - Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); + Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); updateRootQueueMetrics(); // Remove running containers @@ -865,7 +830,7 @@ private synchronized void removeNode(RMNode rmNode) { nodes.remove(rmNode.getNodeID()); LOG.info("Removed node " + rmNode.getNodeAddress() + - " cluster capacity: " + clusterCapacity); + " cluster capacity: " + clusterResource); } @Override @@ -882,7 +847,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, maximumAllocation, incrAllocation); // Release containers for (ContainerId releasedContainerId : release) { @@ -961,13 +926,13 @@ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode no */ private synchronized void nodeUpdate(RMNode nm) { if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); + LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = nodes.get(nm.getNodeID()); + FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); + SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); @@ -1012,7 +977,7 @@ private void continuousScheduling() { // iterate all nodes for (NodeId nodeId : nodeIdList) { if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = nodes.get(nodeId); + FSSchedulerNode node = getFSSchedulerNode(nodeId); try { if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { @@ -1038,7 +1003,7 @@ private void continuousScheduling() { @Override public int compare(NodeId n1, NodeId n2) { - return RESOURCE_CALCULATOR.compare(clusterCapacity, + return RESOURCE_CALCULATOR.compare(clusterResource, nodes.get(n2).getAvailableResource(), nodes.get(n1).getAvailableResource()); } @@ -1075,7 +1040,7 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { int assignedContainers = 0; while (node.getReservedContainer() == null) { boolean assignedContainer = false; - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, queueMgr.getRootQueue().assignContainer(node), Resources.none())) { assignedContainers++; @@ -1089,45 +1054,8 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { updateRootQueueMetrics(); } - @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FSSchedulerNode node = nodes.get(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - SchedulerApplication app = - applications.get(appAttemptId.getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); - } - return null; - } - return new SchedulerAppReport(attempt); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); - } - return null; - } - return attempt.getResourceUsageReport(); + return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId); } /** @@ -1139,7 +1067,7 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( private void updateRootQueueMetrics() { rootMetrics.setAvailableResourcesToQueue( Resources.subtract( - clusterCapacity, rootMetrics.getAllocatedResources())); + clusterResource, rootMetrics.getAllocatedResources())); } @Override @@ -1365,7 +1293,7 @@ public void onReload(AllocationConfiguration queueInfo) { // if it does not already exist, so it can be displayed on the web UI. synchronized (FairScheduler.this) { allocConf = queueInfo; - allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); + allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); queueMgr.updateAllocationConfiguration(allocConf); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 44918f3..4f8735b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -369,7 +369,7 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Set scheduling policies try { SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); - policy.initialize(scheduler.getClusterCapacity()); + policy.initialize(scheduler.getClusterResource()); queue.setPolicy(policy); } catch (AllocationConfigurationException ex) { LOG.warn("Cannot apply configured scheduling policy to queue " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 82000e1..1a29d76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; @@ -38,7 +37,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -76,11 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -114,11 +110,7 @@ Configuration conf; - protected Map nodes = new ConcurrentHashMap(); - private boolean initialized; - private Resource minimumAllocation; - private Resource maximumAllocation; private boolean usePortForNodeName; private ActiveUsersManager activeUsersManager; @@ -218,19 +210,9 @@ public synchronized Configuration getConf() { } @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - - @Override public int getNumClusterNodes() { return nodes.size(); } - - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; - } @Override public synchronized void @@ -333,31 +315,14 @@ public Allocation allocate( } @VisibleForTesting - FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { - SchedulerApplication app = - applications.get(applicationAttemptId.getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - @Override - public SchedulerAppReport getSchedulerAppInfo( + public FiCaSchedulerApp getApplicationAttempt( ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : new SchedulerAppReport(app); + return (FiCaSchedulerApp) super.getApplicationAttempt(applicationAttemptId); } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : app.getResourceUsageReport(); - } - + private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return (FiCaSchedulerNode)nodes.get(nodeId); } private synchronized void addApplication(ApplicationId applicationId, @@ -856,7 +821,6 @@ private synchronized void containerCompleted(RMContainer rmContainer, } - private Resource clusterResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { @@ -903,26 +867,14 @@ public void recover(RMState state) { } @Override - public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { - FiCaSchedulerNode node = getNode(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - @Override public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId); } - private FiCaSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; + protected FiCaSchedulerApp + getCurrentAttemptForContainer(ContainerId containerId) { + return (FiCaSchedulerApp) super.getCurrentAttemptForContainer(containerId); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index 1c5a79b..2c1bc47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -70,7 +70,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { queueName = queue.getName(); schedulingPolicy = queue.getPolicy().getName(); - clusterResources = new ResourceInfo(scheduler.getClusterCapacity()); + clusterResources = new ResourceInfo(scheduler.getClusterResource()); usedResources = new ResourceInfo(queue.getResourceUsage()); fractionMemUsed = (float)usedResources.getMemory() / @@ -81,7 +81,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { maxResources = new ResourceInfo(queue.getMaxShare()); maxResources = new ResourceInfo( Resources.componentwiseMin(queue.getMaxShare(), - scheduler.getClusterCapacity())); + scheduler.getClusterResource())); fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory(); fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 713962b..a259c6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -383,7 +383,7 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { Resource clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0); - when(mCS.getClusterResources()).thenReturn(clusterResources); + when(mCS.getClusterResource()).thenReturn(clusterResources); return policy; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 2b548ef..a9a9975 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -81,7 +81,7 @@ public void setUp() throws IOException { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); @@ -165,7 +165,7 @@ public void testLimitsComputation() throws Exception { // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); CSQueue root = @@ -478,7 +478,7 @@ public void testHeadroom() throws Exception { // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); CapacityScheduler.parseQueue(csContext, csConf, null, "root", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d0ba334..d545c9b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -465,14 +465,14 @@ public void testReconnectedNode() throws Exception { cs.handle(new NodeAddedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n2)); - Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory()); + Assert.assertEquals(6 * GB, cs.getClusterResource().getMemory()); // reconnect n1 with downgraded memory n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1); cs.handle(new NodeRemovedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n1)); - Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); + Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index d509771..fd14ef6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -89,7 +89,7 @@ public void setUp() throws Exception { Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index bdf89bb..2a26d30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -122,7 +122,7 @@ public void setUp() throws Exception { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); @@ -1651,7 +1651,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { newQueues, queues, TestUtils.spyHook); queues = newQueues; - root.reinitialize(newRoot, cs.getClusterResources()); + root.reinitialize(newRoot, cs.getClusterResource()); // after reinitialization assertEquals(3, e.activeApplications.size()); @@ -1676,7 +1676,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception { newQueues, queues, TestUtils.spyHook); queues = newQueues; - root.reinitialize(newRoot, cs.getClusterResources()); + root.reinitialize(newRoot, cs.getClusterResource()); // after reinitialization assertEquals(60, e.getNodeLocalityDelay()); @@ -2070,7 +2070,7 @@ private CapacitySchedulerContext mockCSContext( when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(new YarnConfiguration()); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); when(csContext.getMinimumResourceCapability()).thenReturn( Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 652fcf1..fa9edb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -86,7 +86,7 @@ public void setUp() throws Exception { Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2524763..3983e70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -377,19 +377,19 @@ public void testAggregateCapacityTracking() throws Exception { .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - assertEquals(1024, scheduler.getClusterCapacity().getMemory()); + assertEquals(1024, scheduler.getClusterResource().getMemory()); // Add another node RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - assertEquals(1536, scheduler.getClusterCapacity().getMemory()); + assertEquals(1536, scheduler.getClusterResource().getMemory()); // Remove the first node NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1); scheduler.handle(nodeEvent3); - assertEquals(512, scheduler.getClusterCapacity().getMemory()); + assertEquals(512, scheduler.getClusterResource().getMemory()); } @Test @@ -2123,7 +2123,7 @@ public void testBasicDRFAssignment() throws Exception { FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -2167,7 +2167,7 @@ public void testBasicDRFWithQueues() throws Exception { FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -2210,7 +2210,7 @@ public void testDRFHierarchicalQueues() throws Exception { FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); @@ -2466,8 +2466,8 @@ public void testContinuousScheduling() throws Exception { fs.handle(nodeEvent2); // available resource - Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024); - Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16); + Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024); + Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16); // send application request ApplicationAttemptId appAttemptId = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index f6dfc3f..7b1fed8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import static org.mockito.Mockito.mock; import java.io.IOException; @@ -30,8 +29,6 @@ import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -60,13 +57,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -78,6 +73,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -252,7 +248,7 @@ public void testUpdateResourceOnNode() throws Exception { FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused") - public Map getNodes(){ + public Map getNodes(){ return nodes; } };