diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index f923733..046b213 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -36,6 +36,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -872,4 +874,11 @@ public String moveApplication(ApplicationId appId, String newQueue) throws YarnException { return scheduler.moveApplication(appId, newQueue); } + + @Override + @LimitedPrivate("yarn") + @Unstable + public Resource getClusterResource() { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 0fa1a9e..79ce746 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -142,6 +142,17 @@ + + + + + + + + + + + @@ -178,12 +189,6 @@ - - - - - - diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 684c82b..f94aedbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -170,7 +170,7 @@ public void init(Configuration config, public void editSchedule(){ CSQueue root = scheduler.getRootQueue(); Resource clusterResources = - Resources.clone(scheduler.getClusterResources()); + Resources.clone(scheduler.getClusterResource()); containerBasedPreemptOrKill(root, clusterResources); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0f3af41..642cd31 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -22,21 +22,41 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; -public abstract class AbstractYarnScheduler implements ResourceScheduler { +public abstract class AbstractYarnScheduler + + implements ResourceScheduler { + + private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); + + // Nodes in the cluster, indexed by NodeId + protected Map nodes = + new ConcurrentHashMap(); + + // Whole capacity of the cluster + protected Resource clusterResource = Resource.newInstance(0, 0); + + protected Resource minimumAllocation; + protected Resource maximumAllocation; protected RMContext rmContext; - protected Map applications; + protected Map> applications; protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( @@ -45,7 +65,7 @@ public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); - SchedulerApplication app = applications.get(appId); + SchedulerApplication app = applications.get(appId); List containerList = new ArrayList(); RMApp appImpl = this.rmContext.getRMApps().get(appId); if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { @@ -64,10 +84,75 @@ return containerList; } - public Map getSchedulerApplications() { + public Map> + getSchedulerApplications() { return applications; } - + + @Override + public Resource getClusterResource() { + return clusterResource; + } + + @Override + public Resource getMinimumResourceCapability() { + return minimumAllocation; + } + + @Override + public Resource getMaximumResourceCapability() { + return maximumAllocation; + } + + public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { + SchedulerApplication app = + applications.get(applicationAttemptId.getApplicationId()); + return app == null ? null : app.getCurrentAppAttempt(); + } + + @Override + public SchedulerAppReport getSchedulerAppInfo( + ApplicationAttemptId appAttemptId) { + SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); + if (attempt == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); + } + return null; + } + return new SchedulerAppReport(attempt); + } + + @Override + public ApplicationResourceUsageReport getAppResourceUsageReport( + ApplicationAttemptId appAttemptId) { + SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); + if (attempt == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); + } + return null; + } + return attempt.getResourceUsageReport(); + } + + public T getCurrentAttemptForContainer(ContainerId containerId) { + return getApplicationAttempt(containerId.getApplicationAttemptId()); + } + + @Override + public RMContainer getRMContainer(ContainerId containerId) { + SchedulerApplicationAttempt attempt = + getCurrentAttemptForContainer(containerId); + return (attempt == null) ? null : attempt.getRMContainer(containerId); + } + + @Override + public SchedulerNodeReport getNodeReport(NodeId nodeId) { + N node = nodes.get(nodeId); + return node == null ? null : new SchedulerNodeReport(node); + } + @Override public String moveApplication(ApplicationId appId, String newQueue) throws YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 4d6ca0e..2c788aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -23,11 +23,11 @@ @Private @Unstable -public class SchedulerApplication { +public class SchedulerApplication { private Queue queue; private final String user; - private SchedulerApplicationAttempt currentAttempt; + private T currentAttempt; public SchedulerApplication(Queue queue, String user) { this.queue = queue; @@ -46,11 +46,11 @@ public String getUser() { return user; } - public SchedulerApplicationAttempt getCurrentAppAttempt() { + public T getCurrentAppAttempt() { return currentAttempt; } - public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) { + public void setCurrentAppAttempt(T currentAttempt) { this.currentAttempt = currentAttempt; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 524b1ab..85d016b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -18,11 +18,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.base.Preconditions; /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -31,59 +47,231 @@ @Unstable public abstract class SchedulerNode { + private static final Log LOG = LogFactory.getLog(SchedulerNode.class); + + private Resource availableResource = Resource.newInstance(0, 0); + private Resource usedResource = Resource.newInstance(0, 0); + private Resource totalResourceCapability; + private RMContainer reservedContainer; + private volatile int numContainers; + + + /* set of containers that are allocated containers */ + private final Map launchedContainers = + new HashMap(); + + private final RMNode rmNode; + private final String nodeName; + + public SchedulerNode(RMNode node, boolean usePortForNodeName) { + this.rmNode = node; + this.availableResource = Resources.clone(node.getTotalCapability()); + this.totalResourceCapability = Resources.clone(node.getTotalCapability()); + if (usePortForNodeName) { + nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); + } else { + nodeName = rmNode.getHostName(); + } + } + + public RMNode getRMNode() { + return this.rmNode; + } + + /** + * Get the ID of the node which contains both its hostname and port. + * + * @return the ID of the node + */ + public NodeId getNodeID() { + return this.rmNode.getNodeID(); + } + + public String getHttpAddress() { + return this.rmNode.getHttpAddress(); + } + /** * Get the name of the node for scheduling matching decisions. *

- * Typically this is the 'hostname' reported by the node, but it could be - * configured to be 'hostname:port' reported by the node via the + * Typically this is the 'hostname' reported by the node, but it could be + * configured to be 'hostname:port' reported by the node via the * {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant. * The main usecase of this is Yarn minicluster to be able to differentiate * node manager instances by their port number. * * @return name of the node for scheduling matching decisions. */ - public abstract String getNodeName(); - + public String getNodeName() { + return nodeName; + } + /** * Get rackname. + * * @return rackname */ - public abstract String getRackName(); - + public String getRackName() { + return this.rmNode.getRackName(); + } + /** - * Get used resources on the node. - * @return used resources on the node + * The Scheduler has allocated containers on this node to the given + * application. + * + * @param applicationId + * application + * @param rmContainer + * allocated container */ - public abstract Resource getUsedResource(); + public synchronized void allocateContainer(ApplicationId applicationId, + RMContainer rmContainer) { + Container container = rmContainer.getContainer(); + deductAvailableResource(container.getResource()); + ++numContainers; + + launchedContainers.put(container.getId(), rmContainer); + + LOG.info("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available"); + } /** * Get available resources on the node. + * * @return available resources on the node */ - public abstract Resource getAvailableResource(); + public synchronized Resource getAvailableResource() { + return this.availableResource; + } /** - * Get number of active containers on the node. - * @return number of active containers on the node - */ - public abstract int getNumContainers(); - - /** - * Apply delta resource on node's available resource. - * @param deltaResource the delta of resource need to apply to node + * Get used resources on the node. + * + * @return used resources on the node */ - public abstract void applyDeltaOnAvailableResource(Resource deltaResource); + public synchronized Resource getUsedResource() { + return this.usedResource; + } /** * Get total resources on the node. + * * @return total resources on the node. */ - public abstract Resource getTotalResource(); - + public Resource getTotalResource() { + return this.totalResourceCapability; + } + + private synchronized boolean isValidContainer(Container c) { + if (launchedContainers.containsKey(c.getId())) { + return true; + } + return false; + } + + private synchronized void updateResource(Container container) { + addAvailableResource(container.getResource()); + --numContainers; + } + /** - * Get the ID of the node which contains both its hostname and port. - * @return the ID of the node + * Release an allocated container on this node. + * + * @param container + * container to be released + */ + public synchronized void releaseContainer(Container container) { + if (!isValidContainer(container)) { + LOG.error("Invalid container released " + container); + return; + } + + /* remove the containers from the nodemanger */ + if (null != launchedContainers.remove(container.getId())) { + updateResource(container); + } + + LOG.info("Released container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available" + ", release resources=" + true); + } + + private synchronized void addAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid resource addition of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.addTo(availableResource, resource); + Resources.subtractFrom(usedResource, resource); + } + + private synchronized void deductAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.subtractFrom(availableResource, resource); + Resources.addTo(usedResource, resource); + } + + /** + * Reserve container for the attempt on this node. + */ + public abstract void reserveResource(SchedulerApplicationAttempt attempt, + Priority priority, RMContainer container); + + /** + * Unreserve resources on this node. + */ + public abstract void unreserveResource(SchedulerApplicationAttempt attempt); + + @Override + public String toString() { + return "host: " + rmNode.getNodeAddress() + " #containers=" + + getNumContainers() + " available=" + + getAvailableResource().getMemory() + " used=" + + getUsedResource().getMemory(); + } + + /** + * Get number of active containers on the node. + * + * @return number of active containers on the node */ - public abstract NodeId getNodeID(); + public int getNumContainers() { + return numContainers; + } + + public synchronized List getRunningContainers() { + return new ArrayList(launchedContainers.values()); + } + + public synchronized RMContainer getReservedContainer() { + return reservedContainer; + } + + protected synchronized void + setReservedContainer(RMContainer reservedContainer) { + this.reservedContainer = reservedContainer; + } + /** + * Apply delta resource on node's available resource. + * + * @param deltaResource + * the delta of resource need to apply to node + */ + public synchronized void + applyDeltaOnAvailableResource(Resource deltaResource) { + // we can only adjust available resource if total resource is changed. + Resources.addTo(this.availableResource, deltaResource); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 2348603..21eba39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -69,7 +69,15 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, @Public @Stable public List getQueueUserAclInfo(); - + + /** + * Get the whole resource capacity of the cluster. + * @return the whole resource capacity of the cluster. + */ + @LimitedPrivate("yarn") + @Unstable + public Resource getClusterResource(); + /** * Get minimum allocatable {@link Resource}. * @return minimum allocatable resource @@ -182,7 +190,7 @@ boolean checkAccess(UserGroupInformation callerUGI, @LimitedPrivate("yarn") @Unstable public RMContainer getRMContainer(ContainerId containerId); - + /** * Moves the given application to the given queue * @param appId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e28c18c..9eed61f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -49,11 +48,9 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -75,9 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -100,9 +95,9 @@ @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class CapacityScheduler extends AbstractYarnScheduler - implements PreemptableResourceScheduler, CapacitySchedulerContext, - Configurable { +public class CapacityScheduler extends + AbstractYarnScheduler implements + PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); @@ -182,16 +177,8 @@ public Configuration getConf() { private Map queues = new ConcurrentHashMap(); - private Map nodes = - new ConcurrentHashMap(); - - private Resource clusterResource = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); private int numNodeManagers = 0; - private Resource minimumAllocation; - private Resource maximumAllocation; - private boolean initialized = false; private ResourceCalculator calculator; @@ -231,16 +218,6 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { } @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; - } - - @Override public Comparator getApplicationComparator() { return applicationComparator; } @@ -264,11 +241,6 @@ public synchronized int getNumClusterNodes() { public RMContext getRMContext() { return this.rmContext; } - - @Override - public Resource getClusterResources() { - return clusterResource; - } @Override public synchronized void @@ -283,7 +255,7 @@ public Resource getClusterResources() { this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = - new ConcurrentHashMap(); + new ConcurrentHashMap>(); initializeQueues(this.conf); @@ -536,8 +508,8 @@ private synchronized void addApplication(ApplicationId applicationId, .handle(new RMAppRejectedEvent(applicationId, ace.toString())); return; } - SchedulerApplication application = - new SchedulerApplication(queue, user); + SchedulerApplication application = + new SchedulerApplication(queue, user); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); @@ -548,7 +520,7 @@ private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -572,7 +544,8 @@ private synchronized void addApplicationAttempt( private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = applications.get(applicationId); + SchedulerApplication application = + applications.get(applicationId); if (application == null){ // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, // ignore it. @@ -597,7 +570,7 @@ private synchronized void doneApplicationAttempt( " finalState=" + rmAppAttemptFinalState); FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); if (application == null || attempt == null) { @@ -659,7 +632,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( - ask, getResourceCalculator(), getClusterResources(), + ask, getResourceCalculator(), getClusterResource(), getMinimumResourceCapability(), maximumAllocation); // Release containers @@ -822,7 +795,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (Resources.greaterThanOrEqual(calculator, getClusterResources(), + if (Resources.greaterThanOrEqual(calculator, getClusterResource(), node.getAvailableResource(), minimumAllocation)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + @@ -942,7 +915,7 @@ private synchronized void addNode(RMNode nodeManager) { } private synchronized void removeNode(RMNode nodeInfo) { - FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); + FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } @@ -1015,28 +988,10 @@ private synchronized void completedContainer(RMContainer rmContainer, @Lock(Lock.NoLock.class) @VisibleForTesting - public FiCaSchedulerApp getApplicationAttempt( - ApplicationAttemptId applicationAttemptId) { - SchedulerApplication app = - applications.get(applicationAttemptId.getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : new SchedulerAppReport(app); - } - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( + public FiCaSchedulerApp getApplicationAttempt( ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : app.getResourceUsageReport(); + return super.getApplicationAttempt(applicationAttemptId); } @Lock(Lock.NoLock.class) @@ -1048,24 +1003,6 @@ FiCaSchedulerNode getNode(NodeId nodeId) { Map getAllNodes() { return nodes; } - - @Override - public RMContainer getRMContainer(ContainerId containerId) { - FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); - return (attempt == null) ? null : attempt.getRMContainer(containerId); - } - - @VisibleForTesting - public FiCaSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } @Override @Lock(Lock.NoLock.class) @@ -1074,12 +1011,6 @@ public void recover(RMState state) throws Exception { } @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FiCaSchedulerNode node = getNode(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - @Override public void dropContainerReservation(RMContainer container) { if(LOG.isDebugEnabled()){ LOG.debug("DROP_RESERVATION:" + container.toString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 3dd4c6d..a3dbc35 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -43,7 +43,7 @@ RMContext getRMContext(); - Resource getClusterResources(); + Resource getClusterResource(); /** * Get the yarn configuration. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 971edb8..5ddb9a4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -174,12 +174,12 @@ public LeafQueue(CapacitySchedulerContext cs, int maxActiveApplications = CSQueueUtils.computeMaxActiveApplications( resourceCalculator, - cs.getClusterResources(), this.minimumAllocation, + cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteMaxCapacity); this.maxActiveAppsUsingAbsCap = CSQueueUtils.computeMaxActiveApplications( resourceCalculator, - cs.getClusterResources(), this.minimumAllocation, + cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteCapacity); int maxActiveApplicationsPerUser = CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, @@ -195,7 +195,7 @@ public LeafQueue(CapacitySchedulerContext cs, cs.getConfiguration().getAcls(getQueuePath()); setupQueueConfigs( - cs.getClusterResources(), + cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 1f09475..dba92a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -143,7 +143,7 @@ public ParentQueue(CapacitySchedulerContext cs, this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList()); - setupQueueConfigs(cs.getClusterResources(), + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, state, acls); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 58f12d0..7bab760 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,248 +18,84 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.util.resource.Resources; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource = recordFactory.newRecordInstance(Resource.class); - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - private Resource totalResourceCapability; - - private volatile int numContainers; - - private RMContainer reservedContainer; - - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap(); - - private final RMNode rmNode; - private final String nodeName; - public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { - this.rmNode = node; - this.availableResource.setMemory(node.getTotalCapability().getMemory()); - this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores()); - totalResourceCapability = - Resource.newInstance(node.getTotalCapability().getMemory(), node - .getTotalCapability().getVirtualCores()); - if (usePortForNodeName) { - nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); - } else { - nodeName = rmNode.getHostName(); - } - } - - public RMNode getRMNode() { - return this.rmNode; - } - - public NodeId getNodeID() { - return this.rmNode.getNodeID(); - } - - public String getHttpAddress() { - return this.rmNode.getHttpAddress(); + super(node, usePortForNodeName); } @Override - public String getNodeName() { - return nodeName; - } - - @Override - public String getRackName() { - return this.rmNode.getRackName(); - } - - /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container - */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - @Override - public synchronized Resource getAvailableResource() { - return this.availableResource; - } - - @Override - public synchronized Resource getUsedResource() { - return this.usedResource; - } - - @Override - public Resource getTotalResource() { - return this.totalResourceCapability; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) - return true; - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } - - /** - * Release an allocated container on this node. - * @param container container to be released - */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } - - /* remove the containers from the nodemanger */ - if (null != launchedContainers.remove(container.getId())) { - updateResource(container); - } - - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); - } - - @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource().getMemory() + - " used=" + getUsedResource().getMemory(); - } - - @Override - public int getNumContainers() { - return numContainers; - } - - public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); - } - public synchronized void reserveResource( - SchedulerApplicationAttempt application, Priority priority, - RMContainer reservedContainer) { + SchedulerApplicationAttempt application, Priority priority, + RMContainer container) { // Check if it's already reserved - if (this.reservedContainer != null) { + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { // Sanity check - if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { + if (!container.getContainer().getNodeId().equals(getNodeID())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); } // Cannot reserve more than one application attempt on a given node! // Reservation is still against attempt. - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( - reservedContainer.getContainer().getId().getApplicationAttemptId())) { + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals(container.getContainer().getId().getApplicationAttemptId())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + + " container " + container + " for application " + application.getApplicationAttemptId() + " when currently" + - " reserved container " + this.reservedContainer + + " reserved container " + reservedContainer + " on node " + this); } if (LOG.isDebugEnabled()) { LOG.debug("Updated reserved container " - + reservedContainer.getContainer().getId() + " on node " + this + + container.getContainer().getId() + " on node " + this + " for application attempt " + application.getApplicationAttemptId()); } } else { if (LOG.isDebugEnabled()) { LOG.debug("Reserved container " - + reservedContainer.getContainer().getId() + " on node " + this + + container.getContainer().getId() + " on node " + this + " for application attempt " + application.getApplicationAttemptId()); } } - this.reservedContainer = reservedContainer; + setReservedContainer(container); } + @Override public synchronized void unreserveResource( SchedulerApplicationAttempt application) { - + // adding NP checks as this can now be called for preemption - if (reservedContainer != null - && reservedContainer.getContainer() != null - && reservedContainer.getContainer().getId() != null - && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) { + if (getReservedContainer() != null + && getReservedContainer().getContainer() != null + && getReservedContainer().getContainer().getId() != null + && getReservedContainer().getContainer().getId() + .getApplicationAttemptId() != null) { // Cannot unreserve for wrong application... ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); + getReservedContainer().getContainer().getId() + .getApplicationAttemptId(); if (!reservedApplication.equals( application.getApplicationAttemptId())) { throw new IllegalStateException("Trying to unreserve " + @@ -269,17 +105,6 @@ public synchronized void unreserveResource( " on node " + this); } } - reservedContainer = null; - } - - public synchronized RMContainer getReservedContainer() { - return reservedContainer; + setReservedContainer(null); } - - @Override - public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { - // we can only adjust available resource if total resource is changed. - Resources.addTo(this.availableResource, deltaResource); - } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index c265fcf..1e94046 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -119,9 +119,9 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { // TODO: we might change these queue metrics around a little bit // to match the semantics of the fair scheduler. queueInfo.setCapacity((float) getFairShare().getMemory() / - scheduler.getClusterCapacity().getMemory()); + scheduler.getClusterResource().getMemory()); queueInfo.setCapacity((float) getResourceUsage().getMemory() / - scheduler.getClusterCapacity().getMemory()); + scheduler.getClusterResource().getMemory()); ArrayList childQueueInfos = new ArrayList(); if (includeChildQueues) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 97ea6d4..69f2ab3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -18,28 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable @@ -47,208 +35,56 @@ private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource; - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - private Resource totalResourceCapability; - - private volatile int numContainers; - - private RMContainer reservedContainer; private AppSchedulable reservedAppSchedulable; - - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap(); - - private final RMNode rmNode; - private final String nodeName; public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { - this.rmNode = node; - this.availableResource = Resources.clone(node.getTotalCapability()); - totalResourceCapability = - Resource.newInstance(node.getTotalCapability().getMemory(), node - .getTotalCapability().getVirtualCores()); - if (usePortForNodeName) { - nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); - } else { - nodeName = rmNode.getHostName(); - } - } - - public RMNode getRMNode() { - return rmNode; - } - - public NodeId getNodeID() { - return rmNode.getNodeID(); - } - - public String getHttpAddress() { - return rmNode.getHttpAddress(); - } - - @Override - public String getNodeName() { - return nodeName; + super(node, usePortForNodeName); } @Override - public String getRackName() { - return rmNode.getRackName(); - } - - /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container - */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - @Override - public synchronized Resource getAvailableResource() { - return availableResource; - } - - @Override - public synchronized Resource getUsedResource() { - return usedResource; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) { - return true; - } - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } - - /** - * Release an allocated container on this node. - * @param container container to be released - */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } - - /* remove the containers from the nodemanger */ - launchedContainers.remove(container.getId()); - updateResource(container); - - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - @Override - public Resource getTotalResource() { - return this.totalResourceCapability; - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); - } - - @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource() + - " used=" + getUsedResource(); - } - - @Override - public int getNumContainers() { - return numContainers; - } - - public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); - } - public synchronized void reserveResource( - FSSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + SchedulerApplicationAttempt application, Priority priority, + RMContainer container) { // Check if it's already reserved - if (this.reservedContainer != null) { + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { // Sanity check - if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { + if (!container.getContainer().getNodeId().equals(getNodeID())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); } // Cannot reserve more than one application on a given node! - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( - reservedContainer.getContainer().getId().getApplicationAttemptId())) { + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals(container.getContainer().getId().getApplicationAttemptId())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + + " container " + container + " for application " + application.getApplicationId() + " when currently" + - " reserved container " + this.reservedContainer + + " reserved container " + reservedContainer + " on node " + this); } LOG.info("Updated reserved container " + - reservedContainer.getContainer().getId() + " on node " + + container.getContainer().getId() + " on node " + this + " for application " + application); } else { - LOG.info("Reserved container " + reservedContainer.getContainer().getId() + + LOG.info("Reserved container " + container.getContainer().getId() + " on node " + this + " for application " + application); } - this.reservedContainer = reservedContainer; - this.reservedAppSchedulable = application.getAppSchedulable(); + setReservedContainer(container); + this.reservedAppSchedulable = ((FSSchedulerApp) application).getAppSchedulable(); } + @Override public synchronized void unreserveResource( - FSSchedulerApp application) { + SchedulerApplicationAttempt application) { // Cannot unreserve for wrong application... ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); + getReservedContainer().getContainer().getId().getApplicationAttemptId(); if (!reservedApplication.equals( application.getApplicationAttemptId())) { throw new IllegalStateException("Trying to unreserve " + @@ -258,22 +94,11 @@ public synchronized void unreserveResource( " on node " + this); } - this.reservedContainer = null; + setReservedContainer(null); this.reservedAppSchedulable = null; } - public synchronized RMContainer getReservedContainer() { - return reservedContainer; - } - public synchronized AppSchedulable getReservedAppSchedulable() { return reservedAppSchedulable; } - - @Override - public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { - // we can only adjust available resource if total resource is changed. - Resources.addTo(this.availableResource, deltaResource); - } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..5eeda64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -53,7 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -76,10 +74,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -122,11 +118,11 @@ @LimitedPrivate("yarn") @Unstable @SuppressWarnings("unchecked") -public class FairScheduler extends AbstractYarnScheduler { +public class FairScheduler extends + AbstractYarnScheduler { private boolean initialized; private FairSchedulerConfiguration conf; - private Resource minimumAllocation; - private Resource maximumAllocation; + private Resource incrAllocation; private QueueManager queueMgr; private Clock clock; @@ -152,14 +148,6 @@ // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // Nodes in the cluster, indexed by NodeId - private Map nodes = - new ConcurrentHashMap(); - - // Aggregate capacity of the cluster - private Resource clusterCapacity = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - // How often tasks are preempted protected long preemptionInterval; @@ -246,23 +234,6 @@ public QueueManager getQueueManager() { return queueMgr; } - @Override - public RMContainer getRMContainer(ContainerId containerId) { - FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId); - return (attempt == null) ? null : attempt.getRMContainer(containerId); - } - - private FSSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - /** * A runnable which calls {@link FairScheduler#update()} every * UPDATE_INTERVAL milliseconds. @@ -294,7 +265,7 @@ protected synchronized void update() { // Recursively update demands for all queues rootQueue.updateDemand(); - rootQueue.setFairShare(clusterCapacity); + rootQueue.setFairShare(clusterResource); // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); @@ -322,9 +293,9 @@ private void updatePreemptionVariables() { * Is a queue below its min share for the given task type? */ boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredShare); } @@ -333,9 +304,9 @@ boolean isStarvedForMinShare(FSLeafQueue sched) { * defined as being below half its fair share. */ boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); } @@ -362,7 +333,7 @@ protected synchronized void preemptTasksIfNecessary() { for (FSLeafQueue sched : queueMgr.getLeafQueues()) { resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); } - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { preemptResources(queueMgr.getLeafQueues(), resToPreempt); } @@ -389,7 +360,7 @@ protected void preemptResources(Collection scheds, // Collect running containers from over-scheduled queues List runningContainers = new ArrayList(); for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), sched.getFairShare())) { for (AppSchedulable as : sched.getRunnableAppSchedulables()) { for (RMContainer c : as.getApp().getLiveContainers()) { @@ -421,7 +392,7 @@ public int compare(RMContainer c1, RMContainer c2) { while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); if (container.getState() == RMContainerState.RUNNING && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { warnOrKillContainer(container, apps.get(container), queues.get(container)); preemptedThisRound.add(container); @@ -435,12 +406,12 @@ public int compare(RMContainer c1, RMContainer c2) { // sure we don't preempt too many from any queue Iterator runningIter = runningContainers.iterator(); while (runningIter.hasNext() && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { RMContainer container = runningIter.next(); FSLeafQueue sched = queues.get(container); if (!preemptedThisRound.contains(container) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), sched.getFairShare())) { warnOrKillContainer(container, apps.get(container), sched); @@ -496,20 +467,20 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare @@ -540,18 +511,12 @@ public synchronized ResourceWeights getAppWeight(AppSchedulable app) { return resourceWeights; } - @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - public Resource getIncrementResourceCapability() { return incrAllocation; } - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; + private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) { + return nodes.get(nodeId); } public double getNodeLocalityThreshold() { @@ -578,10 +543,6 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Resource getClusterCapacity() { - return clusterCapacity; - } - public synchronized Clock getClock() { return clock; } @@ -629,8 +590,8 @@ protected synchronized void addApplication(ApplicationId applicationId, return; } - SchedulerApplication application = - new SchedulerApplication(queue, user); + SchedulerApplication application = + new SchedulerApplication(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); @@ -647,7 +608,7 @@ protected synchronized void addApplication(ApplicationId applicationId, protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); FSLeafQueue queue = (FSLeafQueue) application.getQueue(); @@ -720,7 +681,8 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { private synchronized void removeApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = applications.get(applicationId); + SchedulerApplication application = + applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); return; @@ -734,7 +696,7 @@ private synchronized void removeApplicationAttempt( RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); @@ -809,7 +771,7 @@ private synchronized void completedContainer(RMContainer rmContainer, } // Get the node on which the container was allocated - FSSchedulerNode node = nodes.get(container.getNodeId()); + FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { application.unreserve(node, rmContainer.getReservedPriority()); @@ -827,20 +789,20 @@ private synchronized void completedContainer(RMContainer rmContainer, private synchronized void addNode(RMNode node) { nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); - Resources.addTo(clusterCapacity, node.getTotalCapability()); + Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); LOG.info("Added node " + node.getNodeAddress() + - " cluster capacity: " + clusterCapacity); + " cluster capacity: " + clusterResource); } private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = nodes.get(rmNode.getNodeID()); + FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID()); // This can occur when an UNHEALTHY node reconnects if (node == null) { return; } - Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); + Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); updateRootQueueMetrics(); // Remove running containers @@ -865,7 +827,7 @@ private synchronized void removeNode(RMNode rmNode) { nodes.remove(rmNode.getNodeID()); LOG.info("Removed node " + rmNode.getNodeAddress() + - " cluster capacity: " + clusterCapacity); + " cluster capacity: " + clusterResource); } @Override @@ -882,7 +844,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, maximumAllocation, incrAllocation); // Release containers for (ContainerId releasedContainerId : release) { @@ -961,13 +923,13 @@ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode no */ private synchronized void nodeUpdate(RMNode nm) { if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); + LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = nodes.get(nm.getNodeID()); + FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); + SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); @@ -1012,7 +974,7 @@ private void continuousScheduling() { // iterate all nodes for (NodeId nodeId : nodeIdList) { if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = nodes.get(nodeId); + FSSchedulerNode node = getFSSchedulerNode(nodeId); try { if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { @@ -1038,7 +1000,7 @@ private void continuousScheduling() { @Override public int compare(NodeId n1, NodeId n2) { - return RESOURCE_CALCULATOR.compare(clusterCapacity, + return RESOURCE_CALCULATOR.compare(clusterResource, nodes.get(n2).getAvailableResource(), nodes.get(n1).getAvailableResource()); } @@ -1075,7 +1037,7 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { int assignedContainers = 0; while (node.getReservedContainer() == null) { boolean assignedContainer = false; - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, queueMgr.getRootQueue().assignContainer(node), Resources.none())) { assignedContainers++; @@ -1089,45 +1051,8 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { updateRootQueueMetrics(); } - @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FSSchedulerNode node = nodes.get(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - SchedulerApplication app = - applications.get(appAttemptId.getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); - } - return null; - } - return new SchedulerAppReport(attempt); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); - } - return null; - } - return attempt.getResourceUsageReport(); + return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId); } /** @@ -1139,7 +1064,7 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( private void updateRootQueueMetrics() { rootMetrics.setAvailableResourcesToQueue( Resources.subtract( - clusterCapacity, rootMetrics.getAllocatedResources())); + clusterResource, rootMetrics.getAllocatedResources())); } @Override @@ -1258,7 +1183,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) this.rmContext = rmContext; // This stores per-application scheduling information this.applications = - new ConcurrentHashMap(); + new ConcurrentHashMap>(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); @@ -1365,7 +1290,7 @@ public void onReload(AllocationConfiguration queueInfo) { // if it does not already exist, so it can be displayed on the web UI. synchronized (FairScheduler.this) { allocConf = queueInfo; - allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); + allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); queueMgr.updateAllocationConfiguration(allocConf); } } @@ -1385,7 +1310,7 @@ public void onReload(AllocationConfiguration queueInfo) { @Override public synchronized String moveApplication(ApplicationId appId, String queueName) throws YarnException { - SchedulerApplication app = applications.get(appId); + SchedulerApplication app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); } @@ -1449,8 +1374,8 @@ private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app, * Helper for moveApplication, which has appropriate synchronization, so all * operations will be atomic. */ - private void executeMove(SchedulerApplication app, FSSchedulerApp attempt, - FSLeafQueue oldQueue, FSLeafQueue newQueue) { + private void executeMove(SchedulerApplication app, + FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { boolean wasRunnable = oldQueue.removeApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 44918f3..4f8735b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -369,7 +369,7 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Set scheduling policies try { SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); - policy.initialize(scheduler.getClusterCapacity()); + policy.initialize(scheduler.getClusterResource()); queue.setPolicy(policy); } catch (AllocationConfigurationException ex) { LOG.warn("Cannot apply configured scheduling policy to queue " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 21fcdec..bc3441b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; @@ -38,7 +37,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -76,11 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -104,7 +100,8 @@ @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class FifoScheduler extends AbstractYarnScheduler implements +public class FifoScheduler extends + AbstractYarnScheduler implements Configurable { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); @@ -114,11 +111,7 @@ Configuration conf; - protected Map nodes = new ConcurrentHashMap(); - private boolean initialized; - private Resource minimumAllocation; - private Resource maximumAllocation; private boolean usePortForNodeName; private ActiveUsersManager activeUsersManager; @@ -218,19 +211,9 @@ public synchronized Configuration getConf() { } @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - - @Override public int getNumClusterNodes() { return nodes.size(); } - - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; - } @Override public synchronized void @@ -242,7 +225,7 @@ public Resource getMaximumResourceCapability() { this.rmContext = rmContext; //Use ConcurrentSkipListMap because applications need to be ordered this.applications = - new ConcurrentSkipListMap(); + new ConcurrentSkipListMap>(); this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, @@ -332,30 +315,6 @@ public Allocation allocate( } } - @VisibleForTesting - FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { - SchedulerApplication app = - applications.get(applicationAttemptId.getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : new SchedulerAppReport(app); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : app.getResourceUsageReport(); - } - private FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } @@ -363,8 +322,8 @@ private FiCaSchedulerNode getNode(NodeId nodeId) { @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user) { - SchedulerApplication application = - new SchedulerApplication(DEFAULT_QUEUE, user); + SchedulerApplication application = + new SchedulerApplication(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user @@ -377,7 +336,7 @@ public synchronized void addApplication(ApplicationId applicationId, public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt) { - SchedulerApplication application = + SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); // TODO: Fix store @@ -401,7 +360,8 @@ public synchronized void addApplication(ApplicationId applicationId, private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = applications.get(applicationId); + SchedulerApplication application = + applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); return; @@ -419,7 +379,7 @@ private synchronized void doneApplicationAttempt( RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) throws IOException { FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); if (application == null || attempt == null) { throw new IOException("Unknown application " + applicationAttemptId + @@ -456,13 +416,13 @@ private void assignContainers(FiCaSchedulerNode node) { " #applications=" + applications.size()); // Try to assign containers to applications in fifo order - for (Map.Entry e : applications + for (Map.Entry> e : applications .entrySet()) { - FiCaSchedulerApp application = - (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt(); + FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt(); if (application == null) { continue; } + LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { @@ -499,7 +459,7 @@ private void assignContainers(FiCaSchedulerNode node) { // Update the applications' headroom to correctly take into // account the containers assigned in this update. - for (SchedulerApplication application : applications.values()) { + for (SchedulerApplication application : applications.values()) { FiCaSchedulerApp attempt = (FiCaSchedulerApp) application.getCurrentAppAttempt(); if (attempt == null) { @@ -864,7 +824,6 @@ private synchronized void containerCompleted(RMContainer rmContainer, } - private Resource clusterResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { @@ -911,28 +870,11 @@ public void recover(RMState state) { } @Override - public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { - FiCaSchedulerNode node = getNode(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - @Override public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId); } - private FiCaSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - @Override public QueueMetrics getRootQueueMetrics() { return DEFAULT_QUEUE.getMetrics(); @@ -943,13 +885,14 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { return DEFAULT_QUEUE.hasAccess(acl, callerUGI); } - + @Override - public synchronized List getAppsInQueue(String queueName) { + public synchronized List + getAppsInQueue(String queueName) { if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { - List attempts = new ArrayList( - applications.size()); - for (SchedulerApplication app : applications.values()) { + List attempts = + new ArrayList(applications.size()); + for (SchedulerApplication app : applications.values()) { attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId()); } return attempts; @@ -957,5 +900,4 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, return null; } } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index 1c5a79b..2c1bc47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -70,7 +70,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { queueName = queue.getName(); schedulingPolicy = queue.getPolicy().getName(); - clusterResources = new ResourceInfo(scheduler.getClusterCapacity()); + clusterResources = new ResourceInfo(scheduler.getClusterResource()); usedResources = new ResourceInfo(queue.getResourceUsage()); fractionMemUsed = (float)usedResources.getMemory() / @@ -81,7 +81,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { maxResources = new ResourceInfo(queue.getMaxShare()); maxResources = new ResourceInfo( Resources.componentwiseMin(queue.getMaxShare(), - scheduler.getClusterCapacity())); + scheduler.getClusterResource())); fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory(); fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 99b2f2e..6a449f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -456,7 +456,7 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { Resource clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); - when(mCS.getClusterResources()).thenReturn(clusterResources); + when(mCS.getClusterResource()).thenReturn(clusterResources); return policy; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 8fcbf54..460f35e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -384,15 +384,18 @@ public void testCreatePreemptedContainerStatus() { Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus()); } - public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler( - final Map applications, - EventHandler handler, String queueName) throws Exception { + public static SchedulerApplication + verifyAppAddedAndRemovedFromScheduler( + Map> applications, + EventHandler handler, String queueName) + throws Exception { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(appId, queueName, "user"); handler.handle(appAddedEvent); - SchedulerApplication app = applications.get(appId); + SchedulerApplication app = + applications.get(appId); // verify application is added. Assert.assertNotNull(app); Assert.assertEquals("user", app.getUser()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 2b548ef..a9a9975 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -81,7 +81,7 @@ public void setUp() throws IOException { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); @@ -165,7 +165,7 @@ public void testLimitsComputation() throws Exception { // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); CSQueue root = @@ -478,7 +478,7 @@ public void testHeadroom() throws Exception { // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); CapacityScheduler.parseQueue(csContext, csConf, null, "root", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d0ba334..1412039 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -29,8 +29,6 @@ import java.util.Comparator; import java.util.List; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,8 +54,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -465,14 +467,14 @@ public void testReconnectedNode() throws Exception { cs.handle(new NodeAddedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n2)); - Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory()); + Assert.assertEquals(6 * GB, cs.getClusterResource().getMemory()); // reconnect n1 with downgraded memory n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1); cs.handle(new NodeRemovedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n1)); - Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); + Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory()); } @Test @@ -627,17 +629,17 @@ public void testGetAppsInQueue() throws Exception { @Test public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { - - AsyncDispatcher rmDispatcher = new AsyncDispatcher(); - CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); - cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, - null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + @SuppressWarnings("unchecked") + AbstractYarnScheduler cs = + (AbstractYarnScheduler) rm + .getResourceScheduler(); - SchedulerApplication app = + SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( cs.getSchedulerApplications(), cs, "a1"); Assert.assertEquals("a1", app.getQueue().getQueueName()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index d509771..fd14ef6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -89,7 +89,7 @@ public void setUp() throws Exception { Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index bdf89bb..2a26d30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -122,7 +122,7 @@ public void setUp() throws Exception { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); @@ -1651,7 +1651,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { newQueues, queues, TestUtils.spyHook); queues = newQueues; - root.reinitialize(newRoot, cs.getClusterResources()); + root.reinitialize(newRoot, cs.getClusterResource()); // after reinitialization assertEquals(3, e.activeApplications.size()); @@ -1676,7 +1676,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception { newQueues, queues, TestUtils.spyHook); queues = newQueues; - root.reinitialize(newRoot, cs.getClusterResources()); + root.reinitialize(newRoot, cs.getClusterResource()); // after reinitialization assertEquals(60, e.getNodeLocalityDelay()); @@ -2070,7 +2070,7 @@ private CapacitySchedulerContext mockCSContext( when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(new YarnConfiguration()); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); when(csContext.getMinimumResourceCapability()).thenReturn( Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 652fcf1..fa9edb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -86,7 +86,7 @@ public void setUp() throws Exception { Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2524763..fe2cb23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -43,7 +43,6 @@ import javax.xml.parsers.ParserConfigurationException; -import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -80,8 +79,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -95,12 +97,14 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.xml.sax.SAXException; import com.google.common.collect.Sets; +@SuppressWarnings("unchecked") public class TestFairScheduler { static class MockClock implements Clock { @@ -377,19 +381,19 @@ public void testAggregateCapacityTracking() throws Exception { .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - assertEquals(1024, scheduler.getClusterCapacity().getMemory()); + assertEquals(1024, scheduler.getClusterResource().getMemory()); // Add another node RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - assertEquals(1536, scheduler.getClusterCapacity().getMemory()); + assertEquals(1536, scheduler.getClusterResource().getMemory()); // Remove the first node NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1); scheduler.handle(nodeEvent3); - assertEquals(512, scheduler.getClusterCapacity().getMemory()); + assertEquals(512, scheduler.getClusterResource().getMemory()); } @Test @@ -2123,7 +2127,7 @@ public void testBasicDRFAssignment() throws Exception { FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -2167,7 +2171,7 @@ public void testBasicDRFWithQueues() throws Exception { FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -2210,7 +2214,7 @@ public void testDRFHierarchicalQueues() throws Exception { FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); @@ -2466,8 +2470,8 @@ public void testContinuousScheduling() throws Exception { fs.handle(nodeEvent2); // available resource - Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024); - Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16); + Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024); + Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16); // send application request ApplicationAttemptId appAttemptId = @@ -2647,8 +2651,9 @@ public void testGetAppsInQueue() throws Exception { @Test public void testAddAndRemoveAppFromFairScheduler() throws Exception { - FairScheduler scheduler = - (FairScheduler) resourceManager.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) resourceManager + .getResourceScheduler(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( scheduler.getSchedulerApplications(), scheduler, "default"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index f6dfc3f..f5bfc37 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import static org.mockito.Mockito.mock; import java.io.IOException; @@ -30,8 +29,6 @@ import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -60,13 +57,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -78,6 +75,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -594,9 +592,12 @@ public void testGetAppsInQueue() throws Exception { public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, - ResourceScheduler.class); + ResourceScheduler.class); MockRM rm = new MockRM(conf); - FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); + @SuppressWarnings("unchecked") + AbstractYarnScheduler fs = + (AbstractYarnScheduler) rm + .getResourceScheduler(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( fs.getSchedulerApplications(), fs, "queue"); }