diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index f923733..046b213 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -36,6 +36,8 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -872,4 +874,11 @@ public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException {
return scheduler.moveApplication(appId, newQueue);
}
+
+ @Override
+ @LimitedPrivate("yarn")
+ @Unstable
+ public Resource getClusterResource() {
+ return null;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 0fa1a9e..79ce746 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -142,6 +142,17 @@
+
+
+
+
+
+
+
+
+
+
+
@@ -178,12 +189,6 @@
-
-
-
-
-
-
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 684c82b..f94aedbf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -170,7 +170,7 @@ public void init(Configuration config,
public void editSchedule(){
CSQueue root = scheduler.getRootQueue();
Resource clusterResources =
- Resources.clone(scheduler.getClusterResources());
+ Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 0f3af41..642cd31 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -22,21 +22,41 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
-public abstract class AbstractYarnScheduler implements ResourceScheduler {
+public abstract class AbstractYarnScheduler
+
+ implements ResourceScheduler {
+
+ private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
+
+ // Nodes in the cluster, indexed by NodeId
+ protected Map nodes =
+ new ConcurrentHashMap();
+
+ // Whole capacity of the cluster
+ protected Resource clusterResource = Resource.newInstance(0, 0);
+
+ protected Resource minimumAllocation;
+ protected Resource maximumAllocation;
protected RMContext rmContext;
- protected Map applications;
+ protected Map> applications;
protected final static List EMPTY_CONTAINER_LIST =
new ArrayList();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
@@ -45,7 +65,7 @@
public synchronized List getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
- SchedulerApplication app = applications.get(appId);
+ SchedulerApplication app = applications.get(appId);
List containerList = new ArrayList();
RMApp appImpl = this.rmContext.getRMApps().get(appId);
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
@@ -64,10 +84,75 @@
return containerList;
}
- public Map getSchedulerApplications() {
+ public Map>
+ getSchedulerApplications() {
return applications;
}
-
+
+ @Override
+ public Resource getClusterResource() {
+ return clusterResource;
+ }
+
+ @Override
+ public Resource getMinimumResourceCapability() {
+ return minimumAllocation;
+ }
+
+ @Override
+ public Resource getMaximumResourceCapability() {
+ return maximumAllocation;
+ }
+
+ public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication app =
+ applications.get(applicationAttemptId.getApplicationId());
+ return app == null ? null : app.getCurrentAppAttempt();
+ }
+
+ @Override
+ public SchedulerAppReport getSchedulerAppInfo(
+ ApplicationAttemptId appAttemptId) {
+ SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
+ if (attempt == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
+ }
+ return null;
+ }
+ return new SchedulerAppReport(attempt);
+ }
+
+ @Override
+ public ApplicationResourceUsageReport getAppResourceUsageReport(
+ ApplicationAttemptId appAttemptId) {
+ SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
+ if (attempt == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
+ }
+ return null;
+ }
+ return attempt.getResourceUsageReport();
+ }
+
+ public T getCurrentAttemptForContainer(ContainerId containerId) {
+ return getApplicationAttempt(containerId.getApplicationAttemptId());
+ }
+
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
+ SchedulerApplicationAttempt attempt =
+ getCurrentAttemptForContainer(containerId);
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
+ }
+
+ @Override
+ public SchedulerNodeReport getNodeReport(NodeId nodeId) {
+ N node = nodes.get(nodeId);
+ return node == null ? null : new SchedulerNodeReport(node);
+ }
+
@Override
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
index 4d6ca0e..2c788aa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
@@ -23,11 +23,11 @@
@Private
@Unstable
-public class SchedulerApplication {
+public class SchedulerApplication {
private Queue queue;
private final String user;
- private SchedulerApplicationAttempt currentAttempt;
+ private T currentAttempt;
public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
@@ -46,11 +46,11 @@ public String getUser() {
return user;
}
- public SchedulerApplicationAttempt getCurrentAppAttempt() {
+ public T getCurrentAppAttempt() {
return currentAttempt;
}
- public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
+ public void setCurrentAppAttempt(T currentAttempt) {
this.currentAttempt = currentAttempt;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 524b1ab..85d016b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -18,11 +18,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.base.Preconditions;
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -31,59 +47,231 @@
@Unstable
public abstract class SchedulerNode {
+ private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
+
+ private Resource availableResource = Resource.newInstance(0, 0);
+ private Resource usedResource = Resource.newInstance(0, 0);
+ private Resource totalResourceCapability;
+ private RMContainer reservedContainer;
+ private volatile int numContainers;
+
+
+ /* set of containers that are allocated containers */
+ private final Map launchedContainers =
+ new HashMap();
+
+ private final RMNode rmNode;
+ private final String nodeName;
+
+ public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+ this.rmNode = node;
+ this.availableResource = Resources.clone(node.getTotalCapability());
+ this.totalResourceCapability = Resources.clone(node.getTotalCapability());
+ if (usePortForNodeName) {
+ nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
+ } else {
+ nodeName = rmNode.getHostName();
+ }
+ }
+
+ public RMNode getRMNode() {
+ return this.rmNode;
+ }
+
+ /**
+ * Get the ID of the node which contains both its hostname and port.
+ *
+ * @return the ID of the node
+ */
+ public NodeId getNodeID() {
+ return this.rmNode.getNodeID();
+ }
+
+ public String getHttpAddress() {
+ return this.rmNode.getHttpAddress();
+ }
+
/**
* Get the name of the node for scheduling matching decisions.
*
- * Typically this is the 'hostname' reported by the node, but it could be
- * configured to be 'hostname:port' reported by the node via the
+ * Typically this is the 'hostname' reported by the node, but it could be
+ * configured to be 'hostname:port' reported by the node via the
* {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant.
* The main usecase of this is Yarn minicluster to be able to differentiate
* node manager instances by their port number.
*
* @return name of the node for scheduling matching decisions.
*/
- public abstract String getNodeName();
-
+ public String getNodeName() {
+ return nodeName;
+ }
+
/**
* Get rackname.
+ *
* @return rackname
*/
- public abstract String getRackName();
-
+ public String getRackName() {
+ return this.rmNode.getRackName();
+ }
+
/**
- * Get used resources on the node.
- * @return used resources on the node
+ * The Scheduler has allocated containers on this node to the given
+ * application.
+ *
+ * @param applicationId
+ * application
+ * @param rmContainer
+ * allocated container
*/
- public abstract Resource getUsedResource();
+ public synchronized void allocateContainer(ApplicationId applicationId,
+ RMContainer rmContainer) {
+ Container container = rmContainer.getContainer();
+ deductAvailableResource(container.getResource());
+ ++numContainers;
+
+ launchedContainers.put(container.getId(), rmContainer);
+
+ LOG.info("Assigned container " + container.getId() + " of capacity "
+ + container.getResource() + " on host " + rmNode.getNodeAddress()
+ + ", which currently has " + numContainers + " containers, "
+ + getUsedResource() + " used and " + getAvailableResource()
+ + " available");
+ }
/**
* Get available resources on the node.
+ *
* @return available resources on the node
*/
- public abstract Resource getAvailableResource();
+ public synchronized Resource getAvailableResource() {
+ return this.availableResource;
+ }
/**
- * Get number of active containers on the node.
- * @return number of active containers on the node
- */
- public abstract int getNumContainers();
-
- /**
- * Apply delta resource on node's available resource.
- * @param deltaResource the delta of resource need to apply to node
+ * Get used resources on the node.
+ *
+ * @return used resources on the node
*/
- public abstract void applyDeltaOnAvailableResource(Resource deltaResource);
+ public synchronized Resource getUsedResource() {
+ return this.usedResource;
+ }
/**
* Get total resources on the node.
+ *
* @return total resources on the node.
*/
- public abstract Resource getTotalResource();
-
+ public Resource getTotalResource() {
+ return this.totalResourceCapability;
+ }
+
+ private synchronized boolean isValidContainer(Container c) {
+ if (launchedContainers.containsKey(c.getId())) {
+ return true;
+ }
+ return false;
+ }
+
+ private synchronized void updateResource(Container container) {
+ addAvailableResource(container.getResource());
+ --numContainers;
+ }
+
/**
- * Get the ID of the node which contains both its hostname and port.
- * @return the ID of the node
+ * Release an allocated container on this node.
+ *
+ * @param container
+ * container to be released
+ */
+ public synchronized void releaseContainer(Container container) {
+ if (!isValidContainer(container)) {
+ LOG.error("Invalid container released " + container);
+ return;
+ }
+
+ /* remove the containers from the nodemanger */
+ if (null != launchedContainers.remove(container.getId())) {
+ updateResource(container);
+ }
+
+ LOG.info("Released container " + container.getId() + " of capacity "
+ + container.getResource() + " on host " + rmNode.getNodeAddress()
+ + ", which currently has " + numContainers + " containers, "
+ + getUsedResource() + " used and " + getAvailableResource()
+ + " available" + ", release resources=" + true);
+ }
+
+ private synchronized void addAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid resource addition of null resource for "
+ + rmNode.getNodeAddress());
+ return;
+ }
+ Resources.addTo(availableResource, resource);
+ Resources.subtractFrom(usedResource, resource);
+ }
+
+ private synchronized void deductAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid deduction of null resource for "
+ + rmNode.getNodeAddress());
+ return;
+ }
+ Resources.subtractFrom(availableResource, resource);
+ Resources.addTo(usedResource, resource);
+ }
+
+ /**
+ * Reserve container for the attempt on this node.
+ */
+ public abstract void reserveResource(SchedulerApplicationAttempt attempt,
+ Priority priority, RMContainer container);
+
+ /**
+ * Unreserve resources on this node.
+ */
+ public abstract void unreserveResource(SchedulerApplicationAttempt attempt);
+
+ @Override
+ public String toString() {
+ return "host: " + rmNode.getNodeAddress() + " #containers="
+ + getNumContainers() + " available="
+ + getAvailableResource().getMemory() + " used="
+ + getUsedResource().getMemory();
+ }
+
+ /**
+ * Get number of active containers on the node.
+ *
+ * @return number of active containers on the node
*/
- public abstract NodeId getNodeID();
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ public synchronized List getRunningContainers() {
+ return new ArrayList(launchedContainers.values());
+ }
+
+ public synchronized RMContainer getReservedContainer() {
+ return reservedContainer;
+ }
+
+ protected synchronized void
+ setReservedContainer(RMContainer reservedContainer) {
+ this.reservedContainer = reservedContainer;
+ }
+ /**
+ * Apply delta resource on node's available resource.
+ *
+ * @param deltaResource
+ * the delta of resource need to apply to node
+ */
+ public synchronized void
+ applyDeltaOnAvailableResource(Resource deltaResource) {
+ // we can only adjust available resource if total resource is changed.
+ Resources.addTo(this.availableResource, deltaResource);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 2348603..21eba39 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -69,7 +69,15 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
@Public
@Stable
public List getQueueUserAclInfo();
-
+
+ /**
+ * Get the whole resource capacity of the cluster.
+ * @return the whole resource capacity of the cluster.
+ */
+ @LimitedPrivate("yarn")
+ @Unstable
+ public Resource getClusterResource();
+
/**
* Get minimum allocatable {@link Resource}.
* @return minimum allocatable resource
@@ -182,7 +190,7 @@ boolean checkAccess(UserGroupInformation callerUGI,
@LimitedPrivate("yarn")
@Unstable
public RMContainer getRMContainer(ContainerId containerId);
-
+
/**
* Moves the given application to the given queue
* @param appId
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e28c18c..9eed61f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -49,11 +48,9 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -75,9 +72,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -100,9 +95,9 @@
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
-public class CapacityScheduler extends AbstractYarnScheduler
- implements PreemptableResourceScheduler, CapacitySchedulerContext,
- Configurable {
+public class CapacityScheduler extends
+ AbstractYarnScheduler implements
+ PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
@@ -182,16 +177,8 @@ public Configuration getConf() {
private Map queues = new ConcurrentHashMap();
- private Map nodes =
- new ConcurrentHashMap();
-
- private Resource clusterResource =
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
private int numNodeManagers = 0;
- private Resource minimumAllocation;
- private Resource maximumAllocation;
-
private boolean initialized = false;
private ResourceCalculator calculator;
@@ -231,16 +218,6 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
}
@Override
- public Resource getMinimumResourceCapability() {
- return minimumAllocation;
- }
-
- @Override
- public Resource getMaximumResourceCapability() {
- return maximumAllocation;
- }
-
- @Override
public Comparator getApplicationComparator() {
return applicationComparator;
}
@@ -264,11 +241,6 @@ public synchronized int getNumClusterNodes() {
public RMContext getRMContext() {
return this.rmContext;
}
-
- @Override
- public Resource getClusterResources() {
- return clusterResource;
- }
@Override
public synchronized void
@@ -283,7 +255,7 @@ public Resource getClusterResources() {
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
- new ConcurrentHashMap();
+ new ConcurrentHashMap>();
initializeQueues(this.conf);
@@ -536,8 +508,8 @@ private synchronized void addApplication(ApplicationId applicationId,
.handle(new RMAppRejectedEvent(applicationId, ace.toString()));
return;
}
- SchedulerApplication application =
- new SchedulerApplication(queue, user);
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
@@ -548,7 +520,7 @@ private synchronized void addApplication(ApplicationId applicationId,
private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
+ SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
@@ -572,7 +544,8 @@ private synchronized void addApplicationAttempt(
private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
+ SchedulerApplication application =
+ applications.get(applicationId);
if (application == null){
// The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
// ignore it.
@@ -597,7 +570,7 @@ private synchronized void doneApplicationAttempt(
" finalState=" + rmAppAttemptFinalState);
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
- SchedulerApplication application =
+ SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) {
@@ -659,7 +632,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(
- ask, getResourceCalculator(), getClusterResources(),
+ ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), maximumAllocation);
// Release containers
@@ -822,7 +795,7 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
- if (Resources.greaterThanOrEqual(calculator, getClusterResources(),
+ if (Resources.greaterThanOrEqual(calculator, getClusterResource(),
node.getAvailableResource(), minimumAllocation)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
@@ -942,7 +915,7 @@ private synchronized void addNode(RMNode nodeManager) {
}
private synchronized void removeNode(RMNode nodeInfo) {
- FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
+ FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
}
@@ -1015,28 +988,10 @@ private synchronized void completedContainer(RMContainer rmContainer,
@Lock(Lock.NoLock.class)
@VisibleForTesting
- public FiCaSchedulerApp getApplicationAttempt(
- ApplicationAttemptId applicationAttemptId) {
- SchedulerApplication app =
- applications.get(applicationAttemptId.getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
- @Override
- public SchedulerAppReport getSchedulerAppInfo(
- ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : new SchedulerAppReport(app);
- }
-
@Override
- public ApplicationResourceUsageReport getAppResourceUsageReport(
+ public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : app.getResourceUsageReport();
+ return super.getApplicationAttempt(applicationAttemptId);
}
@Lock(Lock.NoLock.class)
@@ -1048,24 +1003,6 @@ FiCaSchedulerNode getNode(NodeId nodeId) {
Map getAllNodes() {
return nodes;
}
-
- @Override
- public RMContainer getRMContainer(ContainerId containerId) {
- FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
- }
-
- @VisibleForTesting
- public FiCaSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
@Override
@Lock(Lock.NoLock.class)
@@ -1074,12 +1011,6 @@ public void recover(RMState state) throws Exception {
}
@Override
- public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- FiCaSchedulerNode node = getNode(nodeId);
- return node == null ? null : new SchedulerNodeReport(node);
- }
-
- @Override
public void dropContainerReservation(RMContainer container) {
if(LOG.isDebugEnabled()){
LOG.debug("DROP_RESERVATION:" + container.toString());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 3dd4c6d..a3dbc35 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -43,7 +43,7 @@
RMContext getRMContext();
- Resource getClusterResources();
+ Resource getClusterResource();
/**
* Get the yarn configuration.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 971edb8..5ddb9a4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -174,12 +174,12 @@ public LeafQueue(CapacitySchedulerContext cs,
int maxActiveApplications =
CSQueueUtils.computeMaxActiveApplications(
resourceCalculator,
- cs.getClusterResources(), this.minimumAllocation,
+ cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteMaxCapacity);
this.maxActiveAppsUsingAbsCap =
CSQueueUtils.computeMaxActiveApplications(
resourceCalculator,
- cs.getClusterResources(), this.minimumAllocation,
+ cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteCapacity);
int maxActiveApplicationsPerUser =
CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit,
@@ -195,7 +195,7 @@ public LeafQueue(CapacitySchedulerContext cs,
cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs(
- cs.getClusterResources(),
+ cs.getClusterResource(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 1f09475..dba92a6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -143,7 +143,7 @@ public ParentQueue(CapacitySchedulerContext cs,
this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList());
- setupQueueConfigs(cs.getClusterResources(),
+ setupQueueConfigs(cs.getClusterResource(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 58f12d0..7bab760 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -18,248 +18,84 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
- private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
- private Resource totalResourceCapability;
-
- private volatile int numContainers;
-
- private RMContainer reservedContainer;
-
- /* set of containers that are allocated containers */
- private final Map launchedContainers =
- new HashMap();
-
- private final RMNode rmNode;
- private final String nodeName;
-
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
- this.rmNode = node;
- this.availableResource.setMemory(node.getTotalCapability().getMemory());
- this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
- totalResourceCapability =
- Resource.newInstance(node.getTotalCapability().getMemory(), node
- .getTotalCapability().getVirtualCores());
- if (usePortForNodeName) {
- nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
- } else {
- nodeName = rmNode.getHostName();
- }
- }
-
- public RMNode getRMNode() {
- return this.rmNode;
- }
-
- public NodeId getNodeID() {
- return this.rmNode.getNodeID();
- }
-
- public String getHttpAddress() {
- return this.rmNode.getHttpAddress();
+ super(node, usePortForNodeName);
}
@Override
- public String getNodeName() {
- return nodeName;
- }
-
- @Override
- public String getRackName() {
- return this.rmNode.getRackName();
- }
-
- /**
- * The Scheduler has allocated containers on this node to the
- * given application.
- *
- * @param applicationId application
- * @param rmContainer allocated container
- */
- public synchronized void allocateContainer(ApplicationId applicationId,
- RMContainer rmContainer) {
- Container container = rmContainer.getContainer();
- deductAvailableResource(container.getResource());
- ++numContainers;
-
- launchedContainers.put(container.getId(), rmContainer);
-
- LOG.info("Assigned container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " +
- getAvailableResource() + " available");
- }
-
- @Override
- public synchronized Resource getAvailableResource() {
- return this.availableResource;
- }
-
- @Override
- public synchronized Resource getUsedResource() {
- return this.usedResource;
- }
-
- @Override
- public Resource getTotalResource() {
- return this.totalResourceCapability;
- }
-
- private synchronized boolean isValidContainer(Container c) {
- if (launchedContainers.containsKey(c.getId()))
- return true;
- return false;
- }
-
- private synchronized void updateResource(Container container) {
- addAvailableResource(container.getResource());
- --numContainers;
- }
-
- /**
- * Release an allocated container on this node.
- * @param container container to be released
- */
- public synchronized void releaseContainer(Container container) {
- if (!isValidContainer(container)) {
- LOG.error("Invalid container released " + container);
- return;
- }
-
- /* remove the containers from the nodemanger */
- if (null != launchedContainers.remove(container.getId())) {
- updateResource(container);
- }
-
- LOG.info("Released container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " + getAvailableResource()
- + " available" + ", release resources=" + true);
- }
-
-
- private synchronized void addAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid resource addition of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.addTo(availableResource, resource);
- Resources.subtractFrom(usedResource, resource);
- }
-
- private synchronized void deductAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid deduction of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.subtractFrom(availableResource, resource);
- Resources.addTo(usedResource, resource);
- }
-
- @Override
- public String toString() {
- return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
- " available=" + getAvailableResource().getMemory() +
- " used=" + getUsedResource().getMemory();
- }
-
- @Override
- public int getNumContainers() {
- return numContainers;
- }
-
- public synchronized List getRunningContainers() {
- return new ArrayList(launchedContainers.values());
- }
-
public synchronized void reserveResource(
- SchedulerApplicationAttempt application, Priority priority,
- RMContainer reservedContainer) {
+ SchedulerApplicationAttempt application, Priority priority,
+ RMContainer container) {
// Check if it's already reserved
- if (this.reservedContainer != null) {
+ RMContainer reservedContainer = getReservedContainer();
+ if (reservedContainer != null) {
// Sanity check
- if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+ if (!container.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
- " on node " + reservedContainer.getReservedNode() +
- " when currently" + " reserved resource " + this.reservedContainer +
- " on node " + this.reservedContainer.getReservedNode());
+ " container " + container +
+ " on node " + container.getReservedNode() +
+ " when currently" + " reserved resource " + reservedContainer +
+ " on node " + reservedContainer.getReservedNode());
}
// Cannot reserve more than one application attempt on a given node!
// Reservation is still against attempt.
- if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
- reservedContainer.getContainer().getId().getApplicationAttemptId())) {
+ if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
+ .equals(container.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
+ " container " + container +
" for application " + application.getApplicationAttemptId() +
" when currently" +
- " reserved container " + this.reservedContainer +
+ " reserved container " + reservedContainer +
" on node " + this);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updated reserved container "
- + reservedContainer.getContainer().getId() + " on node " + this
+ + container.getContainer().getId() + " on node " + this
+ " for application attempt "
+ application.getApplicationAttemptId());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Reserved container "
- + reservedContainer.getContainer().getId() + " on node " + this
+ + container.getContainer().getId() + " on node " + this
+ " for application attempt "
+ application.getApplicationAttemptId());
}
}
- this.reservedContainer = reservedContainer;
+ setReservedContainer(container);
}
+ @Override
public synchronized void unreserveResource(
SchedulerApplicationAttempt application) {
-
+
// adding NP checks as this can now be called for preemption
- if (reservedContainer != null
- && reservedContainer.getContainer() != null
- && reservedContainer.getContainer().getId() != null
- && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) {
+ if (getReservedContainer() != null
+ && getReservedContainer().getContainer() != null
+ && getReservedContainer().getContainer().getId() != null
+ && getReservedContainer().getContainer().getId()
+ .getApplicationAttemptId() != null) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
- reservedContainer.getContainer().getId().getApplicationAttemptId();
+ getReservedContainer().getContainer().getId()
+ .getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
@@ -269,17 +105,6 @@ public synchronized void unreserveResource(
" on node " + this);
}
}
- reservedContainer = null;
- }
-
- public synchronized RMContainer getReservedContainer() {
- return reservedContainer;
+ setReservedContainer(null);
}
-
- @Override
- public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
- // we can only adjust available resource if total resource is changed.
- Resources.addTo(this.availableResource, deltaResource);
- }
-
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index c265fcf..1e94046 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -119,9 +119,9 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
// TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() /
- scheduler.getClusterCapacity().getMemory());
+ scheduler.getClusterResource().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
- scheduler.getClusterCapacity().getMemory());
+ scheduler.getClusterResource().getMemory());
ArrayList childQueueInfos = new ArrayList();
if (includeChildQueues) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 97ea6d4..69f2ab3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -18,28 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
@@ -47,208 +35,56 @@
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private Resource availableResource;
- private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
- private Resource totalResourceCapability;
-
- private volatile int numContainers;
-
- private RMContainer reservedContainer;
private AppSchedulable reservedAppSchedulable;
-
- /* set of containers that are allocated containers */
- private final Map launchedContainers =
- new HashMap();
-
- private final RMNode rmNode;
- private final String nodeName;
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
- this.rmNode = node;
- this.availableResource = Resources.clone(node.getTotalCapability());
- totalResourceCapability =
- Resource.newInstance(node.getTotalCapability().getMemory(), node
- .getTotalCapability().getVirtualCores());
- if (usePortForNodeName) {
- nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
- } else {
- nodeName = rmNode.getHostName();
- }
- }
-
- public RMNode getRMNode() {
- return rmNode;
- }
-
- public NodeId getNodeID() {
- return rmNode.getNodeID();
- }
-
- public String getHttpAddress() {
- return rmNode.getHttpAddress();
- }
-
- @Override
- public String getNodeName() {
- return nodeName;
+ super(node, usePortForNodeName);
}
@Override
- public String getRackName() {
- return rmNode.getRackName();
- }
-
- /**
- * The Scheduler has allocated containers on this node to the
- * given application.
- *
- * @param applicationId application
- * @param rmContainer allocated container
- */
- public synchronized void allocateContainer(ApplicationId applicationId,
- RMContainer rmContainer) {
- Container container = rmContainer.getContainer();
- deductAvailableResource(container.getResource());
- ++numContainers;
-
- launchedContainers.put(container.getId(), rmContainer);
-
- LOG.info("Assigned container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " +
- getAvailableResource() + " available");
- }
-
- @Override
- public synchronized Resource getAvailableResource() {
- return availableResource;
- }
-
- @Override
- public synchronized Resource getUsedResource() {
- return usedResource;
- }
-
- private synchronized boolean isValidContainer(Container c) {
- if (launchedContainers.containsKey(c.getId())) {
- return true;
- }
- return false;
- }
-
- private synchronized void updateResource(Container container) {
- addAvailableResource(container.getResource());
- --numContainers;
- }
-
- /**
- * Release an allocated container on this node.
- * @param container container to be released
- */
- public synchronized void releaseContainer(Container container) {
- if (!isValidContainer(container)) {
- LOG.error("Invalid container released " + container);
- return;
- }
-
- /* remove the containers from the nodemanger */
- launchedContainers.remove(container.getId());
- updateResource(container);
-
- LOG.info("Released container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " + getAvailableResource()
- + " available" + ", release resources=" + true);
- }
-
-
- private synchronized void addAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid resource addition of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.addTo(availableResource, resource);
- Resources.subtractFrom(usedResource, resource);
- }
-
- @Override
- public Resource getTotalResource() {
- return this.totalResourceCapability;
- }
-
- private synchronized void deductAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid deduction of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.subtractFrom(availableResource, resource);
- Resources.addTo(usedResource, resource);
- }
-
- @Override
- public String toString() {
- return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
- " available=" + getAvailableResource() +
- " used=" + getUsedResource();
- }
-
- @Override
- public int getNumContainers() {
- return numContainers;
- }
-
- public synchronized List getRunningContainers() {
- return new ArrayList(launchedContainers.values());
- }
-
public synchronized void reserveResource(
- FSSchedulerApp application, Priority priority,
- RMContainer reservedContainer) {
+ SchedulerApplicationAttempt application, Priority priority,
+ RMContainer container) {
// Check if it's already reserved
- if (this.reservedContainer != null) {
+ RMContainer reservedContainer = getReservedContainer();
+ if (reservedContainer != null) {
// Sanity check
- if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+ if (!container.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
- " on node " + reservedContainer.getReservedNode() +
- " when currently" + " reserved resource " + this.reservedContainer +
- " on node " + this.reservedContainer.getReservedNode());
+ " container " + container +
+ " on node " + container.getReservedNode() +
+ " when currently" + " reserved resource " + reservedContainer +
+ " on node " + reservedContainer.getReservedNode());
}
// Cannot reserve more than one application on a given node!
- if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
- reservedContainer.getContainer().getId().getApplicationAttemptId())) {
+ if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
+ .equals(container.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
+ " container " + container +
" for application " + application.getApplicationId() +
" when currently" +
- " reserved container " + this.reservedContainer +
+ " reserved container " + reservedContainer +
" on node " + this);
}
LOG.info("Updated reserved container " +
- reservedContainer.getContainer().getId() + " on node " +
+ container.getContainer().getId() + " on node " +
this + " for application " + application);
} else {
- LOG.info("Reserved container " + reservedContainer.getContainer().getId() +
+ LOG.info("Reserved container " + container.getContainer().getId() +
" on node " + this + " for application " + application);
}
- this.reservedContainer = reservedContainer;
- this.reservedAppSchedulable = application.getAppSchedulable();
+ setReservedContainer(container);
+ this.reservedAppSchedulable = ((FSSchedulerApp) application).getAppSchedulable();
}
+ @Override
public synchronized void unreserveResource(
- FSSchedulerApp application) {
+ SchedulerApplicationAttempt application) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
- reservedContainer.getContainer().getId().getApplicationAttemptId();
+ getReservedContainer().getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
@@ -258,22 +94,11 @@ public synchronized void unreserveResource(
" on node " + this);
}
- this.reservedContainer = null;
+ setReservedContainer(null);
this.reservedAppSchedulable = null;
}
- public synchronized RMContainer getReservedContainer() {
- return reservedContainer;
- }
-
public synchronized AppSchedulable getReservedAppSchedulable() {
return reservedAppSchedulable;
}
-
- @Override
- public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
- // we can only adjust available resource if total resource is changed.
- Resources.addTo(this.availableResource, deltaResource);
- }
-
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index fab9ebe..5eeda64 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -39,7 +39,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -53,7 +52,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -76,10 +74,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -122,11 +118,11 @@
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler {
+public class FairScheduler extends
+ AbstractYarnScheduler {
private boolean initialized;
private FairSchedulerConfiguration conf;
- private Resource minimumAllocation;
- private Resource maximumAllocation;
+
private Resource incrAllocation;
private QueueManager queueMgr;
private Clock clock;
@@ -152,14 +148,6 @@
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
- // Nodes in the cluster, indexed by NodeId
- private Map nodes =
- new ConcurrentHashMap();
-
- // Aggregate capacity of the cluster
- private Resource clusterCapacity =
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
-
// How often tasks are preempted
protected long preemptionInterval;
@@ -246,23 +234,6 @@ public QueueManager getQueueManager() {
return queueMgr;
}
- @Override
- public RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
- }
-
- private FSSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FSSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
/**
* A runnable which calls {@link FairScheduler#update()} every
* UPDATE_INTERVAL milliseconds.
@@ -294,7 +265,7 @@ protected synchronized void update() {
// Recursively update demands for all queues
rootQueue.updateDemand();
- rootQueue.setFairShare(clusterCapacity);
+ rootQueue.setFairShare(clusterResource);
// Recursively compute fair shares for all queues
// and update metrics
rootQueue.recomputeShares();
@@ -322,9 +293,9 @@ private void updatePreemptionVariables() {
* Is a queue below its min share for the given task type?
*/
boolean isStarvedForMinShare(FSLeafQueue sched) {
- Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
- return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredShare);
}
@@ -333,9 +304,9 @@ boolean isStarvedForMinShare(FSLeafQueue sched) {
* defined as being below half its fair share.
*/
boolean isStarvedForFairShare(FSLeafQueue sched) {
- Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
- return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredFairShare);
}
@@ -362,7 +333,7 @@ protected synchronized void preemptTasksIfNecessary() {
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
}
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) {
preemptResources(queueMgr.getLeafQueues(), resToPreempt);
}
@@ -389,7 +360,7 @@ protected void preemptResources(Collection scheds,
// Collect running containers from over-scheduled queues
List runningContainers = new ArrayList();
for (FSLeafQueue sched : scheds) {
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
@@ -421,7 +392,7 @@ public int compare(RMContainer c1, RMContainer c2) {
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if (container.getState() == RMContainerState.RUNNING &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
warnOrKillContainer(container, apps.get(container), queues.get(container));
preemptedThisRound.add(container);
@@ -435,12 +406,12 @@ public int compare(RMContainer c1, RMContainer c2) {
// sure we don't preempt too many from any queue
Iterator runningIter = runningContainers.iterator();
while (runningIter.hasNext() &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container);
if (!preemptedThisRound.contains(container) &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), sched.getFairShare())) {
warnOrKillContainer(container, apps.get(container), sched);
@@ -496,20 +467,20 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
- Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
- resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+ resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
- Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getFairShare(), sched.getDemand());
- resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+ resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
- Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
resDueToMinShare, resDueToFairShare);
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
@@ -540,18 +511,12 @@ public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
return resourceWeights;
}
- @Override
- public Resource getMinimumResourceCapability() {
- return minimumAllocation;
- }
-
public Resource getIncrementResourceCapability() {
return incrAllocation;
}
- @Override
- public Resource getMaximumResourceCapability() {
- return maximumAllocation;
+ private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
+ return nodes.get(nodeId);
}
public double getNodeLocalityThreshold() {
@@ -578,10 +543,6 @@ public synchronized int getContinuousSchedulingSleepMs() {
return continuousSchedulingSleepMs;
}
- public Resource getClusterCapacity() {
- return clusterCapacity;
- }
-
public synchronized Clock getClock() {
return clock;
}
@@ -629,8 +590,8 @@ protected synchronized void addApplication(ApplicationId applicationId,
return;
}
- SchedulerApplication application =
- new SchedulerApplication(queue, user);
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
@@ -647,7 +608,7 @@ protected synchronized void addApplication(ApplicationId applicationId,
protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
+ SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
@@ -720,7 +681,8 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
private synchronized void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
+ SchedulerApplication application =
+ applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
@@ -734,7 +696,7 @@ private synchronized void removeApplicationAttempt(
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- SchedulerApplication application =
+ SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
@@ -809,7 +771,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
}
// Get the node on which the container was allocated
- FSSchedulerNode node = nodes.get(container.getNodeId());
+ FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(node, rmContainer.getReservedPriority());
@@ -827,20 +789,20 @@ private synchronized void completedContainer(RMContainer rmContainer,
private synchronized void addNode(RMNode node) {
nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
- Resources.addTo(clusterCapacity, node.getTotalCapability());
+ Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics();
LOG.info("Added node " + node.getNodeAddress() +
- " cluster capacity: " + clusterCapacity);
+ " cluster capacity: " + clusterResource);
}
private synchronized void removeNode(RMNode rmNode) {
- FSSchedulerNode node = nodes.get(rmNode.getNodeID());
+ FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
// This can occur when an UNHEALTHY node reconnects
if (node == null) {
return;
}
- Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+ Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
updateRootQueueMetrics();
// Remove running containers
@@ -865,7 +827,7 @@ private synchronized void removeNode(RMNode rmNode) {
nodes.remove(rmNode.getNodeID());
LOG.info("Removed node " + rmNode.getNodeAddress() +
- " cluster capacity: " + clusterCapacity);
+ " cluster capacity: " + clusterResource);
}
@Override
@@ -882,7 +844,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
- clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
+ clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
@@ -961,13 +923,13 @@ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode no
*/
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
- LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+ LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
- FSSchedulerNode node = nodes.get(nm.getNodeID());
+ FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
// Update resource if any change
- SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
+ SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
List containerInfoList = nm.pullContainerUpdates();
List newlyLaunchedContainers = new ArrayList();
@@ -1012,7 +974,7 @@ private void continuousScheduling() {
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
if (nodes.containsKey(nodeId)) {
- FSSchedulerNode node = nodes.get(nodeId);
+ FSSchedulerNode node = getFSSchedulerNode(nodeId);
try {
if (Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
@@ -1038,7 +1000,7 @@ private void continuousScheduling() {
@Override
public int compare(NodeId n1, NodeId n2) {
- return RESOURCE_CALCULATOR.compare(clusterCapacity,
+ return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource());
}
@@ -1075,7 +1037,7 @@ private synchronized void attemptScheduling(FSSchedulerNode node) {
int assignedContainers = 0;
while (node.getReservedContainer() == null) {
boolean assignedContainer = false;
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
queueMgr.getRootQueue().assignContainer(node),
Resources.none())) {
assignedContainers++;
@@ -1089,45 +1051,8 @@ private synchronized void attemptScheduling(FSSchedulerNode node) {
updateRootQueueMetrics();
}
- @Override
- public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- FSSchedulerNode node = nodes.get(nodeId);
- return node == null ? null : new SchedulerNodeReport(node);
- }
-
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
- SchedulerApplication app =
- applications.get(appAttemptId.getApplicationId());
- if (app != null) {
- return (FSSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
- @Override
- public SchedulerAppReport getSchedulerAppInfo(
- ApplicationAttemptId appAttemptId) {
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
- if (attempt == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
- }
- return null;
- }
- return new SchedulerAppReport(attempt);
- }
-
- @Override
- public ApplicationResourceUsageReport getAppResourceUsageReport(
- ApplicationAttemptId appAttemptId) {
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
- if (attempt == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
- }
- return null;
- }
- return attempt.getResourceUsageReport();
+ return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId);
}
/**
@@ -1139,7 +1064,7 @@ public ApplicationResourceUsageReport getAppResourceUsageReport(
private void updateRootQueueMetrics() {
rootMetrics.setAvailableResourcesToQueue(
Resources.subtract(
- clusterCapacity, rootMetrics.getAllocatedResources()));
+ clusterResource, rootMetrics.getAllocatedResources()));
}
@Override
@@ -1258,7 +1183,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
this.rmContext = rmContext;
// This stores per-application scheduling information
this.applications =
- new ConcurrentHashMap();
+ new ConcurrentHashMap>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@@ -1365,7 +1290,7 @@ public void onReload(AllocationConfiguration queueInfo) {
// if it does not already exist, so it can be displayed on the web UI.
synchronized (FairScheduler.this) {
allocConf = queueInfo;
- allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
+ allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
queueMgr.updateAllocationConfiguration(allocConf);
}
}
@@ -1385,7 +1310,7 @@ public void onReload(AllocationConfiguration queueInfo) {
@Override
public synchronized String moveApplication(ApplicationId appId,
String queueName) throws YarnException {
- SchedulerApplication app = applications.get(appId);
+ SchedulerApplication app = applications.get(appId);
if (app == null) {
throw new YarnException("App to be moved " + appId + " not found.");
}
@@ -1449,8 +1374,8 @@ private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
* Helper for moveApplication, which has appropriate synchronization, so all
* operations will be atomic.
*/
- private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
- FSLeafQueue oldQueue, FSLeafQueue newQueue) {
+ private void executeMove(SchedulerApplication app,
+ FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
boolean wasRunnable = oldQueue.removeApp(attempt);
// if app was not runnable before, it may be runnable now
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 44918f3..4f8735b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -369,7 +369,7 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
// Set scheduling policies
try {
SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
- policy.initialize(scheduler.getClusterCapacity());
+ policy.initialize(scheduler.getClusterResource());
queue.setPolicy(policy);
} catch (AllocationConfigurationException ex) {
LOG.warn("Cannot apply configured scheduling policy to queue "
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 21fcdec..bc3441b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -25,7 +25,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
@@ -38,7 +37,6 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -76,11 +74,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -104,7 +100,8 @@
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
-public class FifoScheduler extends AbstractYarnScheduler implements
+public class FifoScheduler extends
+ AbstractYarnScheduler implements
Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -114,11 +111,7 @@
Configuration conf;
- protected Map nodes = new ConcurrentHashMap();
-
private boolean initialized;
- private Resource minimumAllocation;
- private Resource maximumAllocation;
private boolean usePortForNodeName;
private ActiveUsersManager activeUsersManager;
@@ -218,19 +211,9 @@ public synchronized Configuration getConf() {
}
@Override
- public Resource getMinimumResourceCapability() {
- return minimumAllocation;
- }
-
- @Override
public int getNumClusterNodes() {
return nodes.size();
}
-
- @Override
- public Resource getMaximumResourceCapability() {
- return maximumAllocation;
- }
@Override
public synchronized void
@@ -242,7 +225,7 @@ public Resource getMaximumResourceCapability() {
this.rmContext = rmContext;
//Use ConcurrentSkipListMap because applications need to be ordered
this.applications =
- new ConcurrentSkipListMap();
+ new ConcurrentSkipListMap>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -332,30 +315,6 @@ public Allocation allocate(
}
}
- @VisibleForTesting
- FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
- SchedulerApplication app =
- applications.get(applicationAttemptId.getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
- @Override
- public SchedulerAppReport getSchedulerAppInfo(
- ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : new SchedulerAppReport(app);
- }
-
- @Override
- public ApplicationResourceUsageReport getAppResourceUsageReport(
- ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : app.getResourceUsageReport();
- }
-
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
@@ -363,8 +322,8 @@ private FiCaSchedulerNode getNode(NodeId nodeId) {
@VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user) {
- SchedulerApplication application =
- new SchedulerApplication(DEFAULT_QUEUE, user);
+ SchedulerApplication application =
+ new SchedulerApplication(DEFAULT_QUEUE, user);
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
@@ -377,7 +336,7 @@ public synchronized void addApplication(ApplicationId applicationId,
public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
+ SchedulerApplication application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
// TODO: Fix store
@@ -401,7 +360,8 @@ public synchronized void addApplication(ApplicationId applicationId,
private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
+ SchedulerApplication application =
+ applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
@@ -419,7 +379,7 @@ private synchronized void doneApplicationAttempt(
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException {
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
- SchedulerApplication application =
+ SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId +
@@ -456,13 +416,13 @@ private void assignContainers(FiCaSchedulerNode node) {
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry e : applications
+ for (Map.Entry> e : applications
.entrySet()) {
- FiCaSchedulerApp application =
- (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+ FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
if (application == null) {
continue;
}
+
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -499,7 +459,7 @@ private void assignContainers(FiCaSchedulerNode node) {
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (SchedulerApplication application : applications.values()) {
+ for (SchedulerApplication application : applications.values()) {
FiCaSchedulerApp attempt =
(FiCaSchedulerApp) application.getCurrentAppAttempt();
if (attempt == null) {
@@ -864,7 +824,6 @@ private synchronized void containerCompleted(RMContainer rmContainer,
}
- private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
@@ -911,28 +870,11 @@ public void recover(RMState state) {
}
@Override
- public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
- FiCaSchedulerNode node = getNode(nodeId);
- return node == null ? null : new SchedulerNodeReport(node);
- }
-
- @Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
- private FiCaSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
@Override
public QueueMetrics getRootQueueMetrics() {
return DEFAULT_QUEUE.getMetrics();
@@ -943,13 +885,14 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
}
-
+
@Override
- public synchronized List getAppsInQueue(String queueName) {
+ public synchronized List
+ getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
- List attempts = new ArrayList(
- applications.size());
- for (SchedulerApplication app : applications.values()) {
+ List attempts =
+ new ArrayList(applications.size());
+ for (SchedulerApplication app : applications.values()) {
attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
}
return attempts;
@@ -957,5 +900,4 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
return null;
}
}
-
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
index 1c5a79b..2c1bc47 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
@@ -70,7 +70,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
queueName = queue.getName();
schedulingPolicy = queue.getPolicy().getName();
- clusterResources = new ResourceInfo(scheduler.getClusterCapacity());
+ clusterResources = new ResourceInfo(scheduler.getClusterResource());
usedResources = new ResourceInfo(queue.getResourceUsage());
fractionMemUsed = (float)usedResources.getMemory() /
@@ -81,7 +81,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
maxResources = new ResourceInfo(queue.getMaxShare());
maxResources = new ResourceInfo(
Resources.componentwiseMin(queue.getMaxShare(),
- scheduler.getClusterCapacity()));
+ scheduler.getClusterResource()));
fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory();
fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 99b2f2e..6a449f5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -456,7 +456,7 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
Resource clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
- when(mCS.getClusterResources()).thenReturn(clusterResources);
+ when(mCS.getClusterResource()).thenReturn(clusterResources);
return policy;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index 8fcbf54..ca7b14a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -384,15 +384,18 @@ public void testCreatePreemptedContainerStatus() {
Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
}
- public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler(
- final Map applications,
- EventHandler handler, String queueName) throws Exception {
+ public static SchedulerApplication
+ verifyAppAddedAndRemovedFromScheduler(
+ Map> applications,
+ EventHandler handler, String queueName, RMContext rmContext)
+ throws Exception {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appId, queueName, "user");
handler.handle(appAddedEvent);
- SchedulerApplication app = applications.get(appId);
+ SchedulerApplication app =
+ applications.get(appId);
// verify application is added.
Assert.assertNotNull(app);
Assert.assertEquals("user", app.getUser());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 2b548ef..a9a9975 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -81,7 +81,7 @@ public void setUp() throws IOException {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
- when(csContext.getClusterResources()).
+ when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
@@ -165,7 +165,7 @@ public void testLimitsComputation() throws Exception {
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
- when(csContext.getClusterResources()).thenReturn(clusterResource);
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
Map queues = new HashMap();
CSQueue root =
@@ -478,7 +478,7 @@ public void testHeadroom() throws Exception {
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB);
- when(csContext.getClusterResources()).thenReturn(clusterResource);
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
Map queues = new HashMap();
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index d0ba334..1412039 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -29,8 +29,6 @@
import java.util.Comparator;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -56,8 +54,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -72,6 +73,7 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -465,14 +467,14 @@ public void testReconnectedNode() throws Exception {
cs.handle(new NodeAddedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n2));
- Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory());
+ Assert.assertEquals(6 * GB, cs.getClusterResource().getMemory());
// reconnect n1 with downgraded memory
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
cs.handle(new NodeRemovedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n1));
- Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
+ Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
}
@Test
@@ -627,17 +629,17 @@ public void testGetAppsInQueue() throws Exception {
@Test
public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
-
- AsyncDispatcher rmDispatcher = new AsyncDispatcher();
- CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
- cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
- null, null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null));
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ @SuppressWarnings("unchecked")
+ AbstractYarnScheduler cs =
+ (AbstractYarnScheduler) rm
+ .getResourceScheduler();
- SchedulerApplication app =
+ SchedulerApplication app =
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
cs.getSchedulerApplications(), cs, "a1");
Assert.assertEquals("a1", app.getQueue().getQueueName());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index d509771..fd14ef6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -89,7 +89,7 @@ public void setUp() throws Exception {
Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB, 32));
- when(csContext.getClusterResources()).
+ when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index bdf89bb..2a26d30 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -122,7 +122,7 @@ public void setUp() throws Exception {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
- when(csContext.getClusterResources()).
+ when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
@@ -1651,7 +1651,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
- root.reinitialize(newRoot, cs.getClusterResources());
+ root.reinitialize(newRoot, cs.getClusterResource());
// after reinitialization
assertEquals(3, e.activeApplications.size());
@@ -1676,7 +1676,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception {
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
- root.reinitialize(newRoot, cs.getClusterResources());
+ root.reinitialize(newRoot, cs.getClusterResource());
// after reinitialization
assertEquals(60, e.getNodeLocalityDelay());
@@ -2070,7 +2070,7 @@ private CapacitySchedulerContext mockCSContext(
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(new YarnConfiguration());
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
- when(csContext.getClusterResources()).thenReturn(clusterResource);
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
when(csContext.getMinimumResourceCapability()).thenReturn(
Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 652fcf1..fa9edb1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -86,7 +86,7 @@ public void setUp() throws Exception {
Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB, 32));
- when(csContext.getClusterResources()).
+ when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 2524763..fe2cb23 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -43,7 +43,6 @@
import javax.xml.parsers.ParserConfigurationException;
-import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -80,8 +79,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -95,12 +97,14 @@
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.xml.sax.SAXException;
import com.google.common.collect.Sets;
+@SuppressWarnings("unchecked")
public class TestFairScheduler {
static class MockClock implements Clock {
@@ -377,19 +381,19 @@ public void testAggregateCapacityTracking() throws Exception {
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
- assertEquals(1024, scheduler.getClusterCapacity().getMemory());
+ assertEquals(1024, scheduler.getClusterResource().getMemory());
// Add another node
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
- assertEquals(1536, scheduler.getClusterCapacity().getMemory());
+ assertEquals(1536, scheduler.getClusterResource().getMemory());
// Remove the first node
NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(nodeEvent3);
- assertEquals(512, scheduler.getClusterCapacity().getMemory());
+ assertEquals(512, scheduler.getClusterResource().getMemory());
}
@Test
@@ -2123,7 +2127,7 @@ public void testBasicDRFAssignment() throws Exception {
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
- drfPolicy.initialize(scheduler.getClusterCapacity());
+ drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
@@ -2167,7 +2171,7 @@ public void testBasicDRFWithQueues() throws Exception {
FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
- drfPolicy.initialize(scheduler.getClusterCapacity());
+ drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
@@ -2210,7 +2214,7 @@ public void testDRFHierarchicalQueues() throws Exception {
FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
- drfPolicy.initialize(scheduler.getClusterCapacity());
+ drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
@@ -2466,8 +2470,8 @@ public void testContinuousScheduling() throws Exception {
fs.handle(nodeEvent2);
// available resource
- Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
- Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
+ Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024);
+ Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16);
// send application request
ApplicationAttemptId appAttemptId =
@@ -2647,8 +2651,9 @@ public void testGetAppsInQueue() throws Exception {
@Test
public void testAddAndRemoveAppFromFairScheduler() throws Exception {
- FairScheduler scheduler =
- (FairScheduler) resourceManager.getResourceScheduler();
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) resourceManager
+ .getResourceScheduler();
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
scheduler.getSchedulerApplications(), scheduler, "default");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index f6dfc3f..f5bfc37 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -20,7 +20,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-
import static org.mockito.Mockito.mock;
import java.io.IOException;
@@ -30,8 +29,6 @@
import java.util.List;
import java.util.Map;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -60,13 +57,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -78,6 +75,7 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -594,9 +592,12 @@ public void testGetAppsInQueue() throws Exception {
public void testAddAndRemoveAppFromFiFoScheduler() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
- ResourceScheduler.class);
+ ResourceScheduler.class);
MockRM rm = new MockRM(conf);
- FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler();
+ @SuppressWarnings("unchecked")
+ AbstractYarnScheduler fs =
+ (AbstractYarnScheduler) rm
+ .getResourceScheduler();
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
fs.getSchedulerApplications(), fs, "queue");
}