diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 943ecb0..a129f24 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -141,6 +141,14 @@
+
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
index bee5275..35e22b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
@@ -55,7 +55,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
float maximumCapacity, float currentCapacity,
List childQueues, List applications,
QueueState queueState, Set accessibleNodeLabels,
- String defaultNodeLabelExpression) {
+ String defaultNodeLabelExpression, String orderingPolicy) {
QueueInfo queueInfo = Records.newRecord(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setCapacity(capacity);
@@ -66,6 +66,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
queueInfo.setQueueState(queueState);
queueInfo.setAccessibleNodeLabels(accessibleNodeLabels);
queueInfo.setDefaultNodeLabelExpression(defaultNodeLabelExpression);
+ queueInfo.setOrderingPolicy(orderingPolicy);
return queueInfo;
}
@@ -184,4 +185,13 @@ public static QueueInfo newInstance(String queueName, float capacity,
@Stable
public abstract void setDefaultNodeLabelExpression(
String defaultLabelExpression);
+
+ @Public
+ @Unstable
+ public abstract String getOrderingPolicy();
+
+ @Public
+ @Unstable
+ public abstract void setOrderingPolicy(
+ String orderingPolicy);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index b396f4d..b7375c0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -356,6 +356,7 @@ message QueueInfoProto {
repeated ApplicationReportProto applications = 7;
repeated string accessibleNodeLabels = 8;
optional string defaultNodeLabelExpression = 9;
+ optional string orderingPolicy = 10;
}
enum QueueACLProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java
index 8a5521d..da61a38 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java
@@ -151,5 +151,12 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
labelList.append(nodeLabel);
}
writer.println(labelList.toString());
+
+ writer.print("\tOrdering Policy : ");
+ if (null != queueInfo.getOrderingPolicy()) {
+ writer.println(queueInfo.getOrderingPolicy());
+ } else {
+ writer.println();
+ }
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index f468bc1..509b470 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -685,7 +685,7 @@ public YarnClusterMetrics createFakeYarnClusterMetrics() {
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
- createFakeAppReports(), QueueState.RUNNING, null, null);
+ createFakeAppReports(), QueueState.RUNNING, null, null, null);
}
public List createFakeQueueUserACLInfoList() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index 4b60c52..80f5892 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -1268,7 +1268,7 @@ public void testGetQueueInfo() throws Exception {
nodeLabels.add("GPU");
nodeLabels.add("JDK_7");
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
- null, null, QueueState.RUNNING, nodeLabels, "GPU");
+ null, null, QueueState.RUNNING, nodeLabels, "GPU", null);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@@ -1283,6 +1283,7 @@ public void testGetQueueInfo() throws Exception {
pw.println("\tMaximum Capacity : " + "80.0%");
pw.println("\tDefault Node Label expression : " + "GPU");
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
+ pw.println("\tOrdering Policy : ");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
@@ -1292,7 +1293,7 @@ public void testGetQueueInfo() throws Exception {
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
QueueCLI cli = createAndGetQueueCLI();
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
- null, null, QueueState.RUNNING, null, null);
+ null, null, QueueState.RUNNING, null, null, null);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@@ -1307,6 +1308,7 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
pw.println("\tMaximum Capacity : " + "80.0%");
pw.println("\tDefault Node Label expression : ");
pw.println("\tAccessible Node Labels : ");
+ pw.println("\tOrdering Policy : ");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
index 4b83500..09f437f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
@@ -368,4 +368,21 @@ public void setDefaultNodeLabelExpression(String defaultNodeLabelExpression) {
}
builder.setDefaultNodeLabelExpression(defaultNodeLabelExpression);
}
+
+ @Override
+ public String getOrderingPolicy() {
+ QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasOrderingPolicy()) ? p
+ .getOrderingPolicy() : null;
+ }
+
+ @Override
+ public void setOrderingPolicy(String orderingPolicy) {
+ maybeInitBuilder();
+ if (orderingPolicy == null) {
+ builder.clearOrderingPolicy();
+ return;
+ }
+ builder.setOrderingPolicy(orderingPolicy);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 80299c0..d6c0e2b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -482,7 +482,7 @@ public static void setup() throws Exception {
// it is recursive(has sub queues)
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
- "x && y"));
+ "x && y", null));
generateByNewInstance(QueueUserACLInfo.class);
generateByNewInstance(YarnClusterMetrics.class);
// for reservation system
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 5e0bbc7..0179847 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -57,6 +57,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerProcess;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
@@ -69,7 +71,7 @@
*/
@Private
@Unstable
-public class SchedulerApplicationAttempt {
+public class SchedulerApplicationAttempt implements SchedulerProcess {
private static final Log LOG = LogFactory
.getLog(SchedulerApplicationAttempt.class);
@@ -669,4 +671,58 @@ public void recordContainerAllocationTime(long value) {
public Set getBlacklistedNodes() {
return this.appSchedulingInfo.getBlackListCopy();
}
+
+ public Resource getDemand() {
+ Resource demand = Resources.createResource(0);
+ // Demand is current consumption plus outstanding requests
+ Resources.addTo(demand, getCurrentConsumption());
+
+ // Add up outstanding resource requests
+ synchronized (this) {
+ for (Priority p : getPriorities()) {
+ for (ResourceRequest r : getResourceRequests(p).values()) {
+ Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
+ Resources.addTo(demand, total);
+ }
+ }
+ }
+ return demand;
+ }
+
+ @Override
+ public int compareInputOrderTo(SchedulerProcess other) {
+ if (other instanceof SchedulerApplicationAttempt) {
+ return getApplicationId().compareTo(
+ ((SchedulerApplicationAttempt)other).getApplicationId());
+ }
+ return 1;//let other types go before this, if any
+ }
+
+ @Override
+ public String getId() {
+ return getApplicationId().toString();
+ }
+
+ private Resource cachedConsumption = Resources.createResource(0);
+
+ @Override
+ public Resource getCachedConsumption() {
+ return cachedConsumption;
+ }
+
+ private Resource cachedDemand = Resources.createResource(0);
+
+ @Override
+ public Resource getCachedDemand() {
+ return cachedDemand;
+ }
+
+ @Override
+ public void updateCaches() {
+ Resource updConsumption = Resources.createResource(0);
+ Resources.addTo(updConsumption, getCurrentConsumption());
+ cachedConsumption = updConsumption;
+ cachedDemand = getDemand();
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index c86c0ff..351d271 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1473,7 +1473,7 @@ public synchronized void removeQueue(String queueName)
// at this point we should have no more apps
if (disposableLeafQueue.getNumApplications() > 0) {
throw new SchedulerDynamicEditException("The queue " + queueName
- + " is not empty " + disposableLeafQueue.getApplications().size()
+ + " is not empty " + disposableLeafQueue.getNumActiveApplications()
+ " active apps " + disposableLeafQueue.pendingApplications.size()
+ " pending apps");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 102e553..e3ff5e1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -48,6 +48,9 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
+
+
import com.google.common.collect.ImmutableSet;
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
@@ -117,6 +120,14 @@
public static final String MAXIMUM_ALLOCATION_VCORES =
"maximum-allocation-vcores";
+ public static final String ORDERING_POLICY_CLASS = "ordering-policy-class";
+
+ public static final String DEFAULT_ORDERING_POLICY_CLASS = "";
+
+ public static final String ORDERING_POLICY = "ordering-policy";
+
+ public static final String DEFAULT_ORDERING_POLICY = "fifo";
+
@Private
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@@ -373,6 +384,35 @@ public int getUserLimit(String queue) {
DEFAULT_USER_LIMIT);
return userLimit;
}
+
+ @SuppressWarnings("unchecked")
+ public OrderingPolicy getOrderingPolicy(
+ String queue) {
+
+ String policyClass = get(getQueuePrefix(queue) + ORDERING_POLICY_CLASS,
+ DEFAULT_ORDERING_POLICY_CLASS);
+
+ if (policyClass.equals("")) {
+ policyClass =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerComparatorPolicy";
+ }
+
+ String policyConfig = get(getQueuePrefix(queue) + ORDERING_POLICY,
+ DEFAULT_ORDERING_POLICY);
+
+ OrderingPolicy policy;
+ try {
+ policy = (OrderingPolicy) Class.forName(policyClass).newInstance();
+ } catch (Exception e) {
+ String message = "Unable to construct ordering policy for " + policyClass + ", message: " + e.getMessage();
+ LOG.error(message, e);
+ throw new RuntimeException(message, e);
+ }
+ if (policy instanceof SchedulerComparatorPolicy) {
+ ((SchedulerComparatorPolicy)policy).configureComparators(policyConfig);
+ }
+ return policy;
+ }
public void setUserLimit(String queue, int userLimit) {
setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 3e5405d..2ec6627 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -93,7 +94,6 @@
private int nodeLocalityDelay;
- Set activeApplications;
Map applicationAttemptMap =
new HashMap();
@@ -121,6 +121,10 @@
private volatile ResourceLimits currentResourceLimits = null;
+ private OrderingPolicy
+ orderingPolicy =
+ new SchedulerComparatorPolicy(new FifoComparator());
+
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
@@ -137,7 +141,6 @@ public LeafQueue(CapacitySchedulerContext cs,
cs.getApplicationComparator();
this.pendingApplications =
new TreeSet(applicationComparator);
- this.activeApplications = new TreeSet(applicationComparator);
setupQueueConfigs(cs.getClusterResource());
}
@@ -159,6 +162,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
setQueueResourceLimitsInfo(clusterResource);
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+
+ setOrderingPolicy(conf.getOrderingPolicy(getQueuePath()));
+
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@@ -322,7 +328,7 @@ public synchronized int getNumPendingApplications() {
}
public synchronized int getNumActiveApplications() {
- return activeApplications.size();
+ return orderingPolicy.getSchedulerProcessesSize();
}
@Private
@@ -363,6 +369,7 @@ public synchronized float getUserLimitFactor() {
public synchronized QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = getQueueInfo();
+ queueInfo.setOrderingPolicy(getOrderingPolicy().getInfo());
return queueInfo;
}
@@ -637,7 +644,7 @@ private synchronized void activateApplications() {
}
}
user.activateApplication();
- activeApplications.add(application);
+ orderingPolicy.addSchedulerProcess(application);
queueUsage.incAMUsed(application.getAMResource());
user.getResourceUsage().incAMUsed(application.getAMResource());
i.remove();
@@ -686,7 +693,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue)
public synchronized void removeApplicationAttempt(
FiCaSchedulerApp application, User user) {
- boolean wasActive = activeApplications.remove(application);
+ boolean wasActive =
+ orderingPolicy.removeSchedulerProcess(application);
if (!wasActive) {
pendingApplications.remove(application);
} else {
@@ -755,7 +763,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
- + " #applications=" + activeApplications.size());
+ + " #applications=" +
+ orderingPolicy.getSchedulerProcessesSize());
}
// if our queue cannot access this node, just return
@@ -775,9 +784,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
}
}
- // Try to assign containers to applications in order
- for (FiCaSchedulerApp application : activeApplications) {
-
+ for (Iterator assignmentIterator =
+ orderingPolicy.getAssignmentIterator();
+ assignmentIterator.hasNext();) {
+ FiCaSchedulerApp application = assignmentIterator.next();
if(LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application "
+ application.getApplicationId());
@@ -1542,6 +1552,9 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
if (allocatedContainer == null) {
return Resources.none();
}
+
+ // Inform the ordering policy
+ orderingPolicy.containerAllocated(application, allocatedContainer);
// Inform the node
node.allocateContainer(allocatedContainer);
@@ -1642,6 +1655,10 @@ public void completedContainer(Resource clusterResource,
removed =
application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition());
+
+ // Inform the ordering policy
+ orderingPolicy.containerReleased(application, rmContainer);
+
node.releaseContainer(container);
}
@@ -1749,7 +1766,8 @@ public synchronized void updateClusterResource(Resource clusterResource,
activateApplications();
// Update application properties
- for (FiCaSchedulerApp application : activeApplications) {
+ for (FiCaSchedulerApp application :
+ orderingPolicy.getSchedulerProcesses()) {
synchronized (application) {
computeUserLimitAndSetHeadroom(application, clusterResource,
Resources.none(), null);
@@ -1855,19 +1873,19 @@ public void recoverContainer(Resource clusterResource,
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
-
+
/**
* Obtain (read-only) collection of active applications.
*/
- public Set getApplications() {
- // need to access the list of apps from the preemption monitor
- return activeApplications;
+ public Collection getApplications() {
+ return orderingPolicy.getSchedulerProcesses();
}
// return a single Resource capturing the overal amount of pending resources
public synchronized Resource getTotalResourcePending() {
Resource ret = BuilderUtils.newResource(0, 0);
- for (FiCaSchedulerApp f : activeApplications) {
+ for (FiCaSchedulerApp f :
+ orderingPolicy.getSchedulerProcesses()) {
Resources.addTo(ret, f.getTotalPendingRequests());
}
return ret;
@@ -1879,7 +1897,8 @@ public synchronized void collectSchedulerApplications(
for (FiCaSchedulerApp pendingApp : pendingApplications) {
apps.add(pendingApp.getApplicationAttemptId());
}
- for (FiCaSchedulerApp app : activeApplications) {
+ for (FiCaSchedulerApp app :
+ orderingPolicy.getSchedulerProcesses()) {
apps.add(app.getApplicationAttemptId());
}
}
@@ -1932,6 +1951,19 @@ public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
+ public synchronized OrderingPolicy
+ getOrderingPolicy() {
+ return orderingPolicy;
+ }
+
+ public synchronized void setOrderingPolicy(
+ OrderingPolicy orderingPolicy) {
+ orderingPolicy.addAllSchedulerProcesses(
+ this.orderingPolicy.getSchedulerProcesses()
+ );
+ this.orderingPolicy = orderingPolicy;
+ }
+
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
index bf2a25b..753c94a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
@@ -68,10 +68,9 @@ public static SchedulingPolicy getInstance(Class extends SchedulingPolicy> cla
* @param policy canonical class name or "drf" or "fair" or "fifo"
* @throws AllocationConfigurationException
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
public static SchedulingPolicy parse(String policy)
throws AllocationConfigurationException {
- @SuppressWarnings("rawtypes")
Class clazz;
String text = StringUtils.toLowerCase(policy);
if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
new file mode 100644
index 0000000..117e1f4
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+
+/**
+ * A SchedulerComparator which supports combining other
+ * SchedulerComparators together to implement overall logic.
+ * Comparison occurs in the order of the iterators in the comparators
+ * member, with the first non-0 value returned
+ * (0 if all results are 0)
+ */
+public class CompoundComparator
+ implements SchedulerComparator {
+
+ private List> comparators =
+ new ArrayList>();
+
+ @Override
+ public int compare(SchedulerProcess r1, SchedulerProcess r2) {
+ for (SchedulerComparator comparator : comparators) {
+ int result = comparator.compare(r1, r2);
+ if (result != 0) return result;
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean reorderOnContainerAllocate() {
+ for (SchedulerComparator comparator : comparators) {
+ boolean result = comparator.reorderOnContainerAllocate();
+ if (result) return result;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean reorderOnContainerRelease() {
+ for (SchedulerComparator comparator : comparators) {
+ boolean result = comparator.reorderOnContainerRelease();
+ if (result) return result;
+ }
+ return false;
+ }
+
+ public void setComparators(
+ List> comparators) {
+ this.comparators = comparators;
+ }
+
+ public List> getComparators() {
+ return comparators;
+ }
+
+ @Override
+ public void configure(String conf) {
+
+ }
+
+ @Override
+ public String getInfo() {
+ StringBuilder info = new StringBuilder();
+ info.append("CompoundComparator ( ");
+ for (Iterator> ic = comparators.iterator();
+ ic.hasNext();) {
+ SchedulerComparator c = ic.next();
+ info.append(c.getInfo());
+ if (ic.hasNext()) {
+ info.append(", ");
+ }
+ }
+ info.append(" )");
+ return info.toString();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java
new file mode 100644
index 0000000..14b58c1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+
+/**
+ * A comparator which orders SchedulerProcesses by submission order, earlier
+ * submission is lesser
+ */
+public class FifoComparator implements SchedulerComparator {
+
+ @Override
+ public int compare(SchedulerProcess r1, SchedulerProcess r2) {
+ int res = r1.compareInputOrderTo(r2);
+ if (res == 0) {
+ //cannot return equality for different processses, will result in
+ //"loss". process must always have unique ids to use as a fallback
+ res = r1.getId().compareTo(r2.getId());
+ }
+ return res;
+ }
+
+ @Override
+ public boolean reorderOnContainerAllocate() {
+ return false;
+ }
+
+ @Override
+ public boolean reorderOnContainerRelease() {
+ return false;
+ }
+
+ @Override
+ public void configure(String conf) {
+
+ }
+
+ @Override
+ public String getInfo() {
+ return "FifoComparator";
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
new file mode 100644
index 0000000..9529eb6
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+
+/**
+ * An OrderingPolicy is used by the scheduler to order SchedulerProcesses for
+ * container assignment and preemption
+ */
+public interface OrderingPolicy {
+ /*
+ * Note: Some implementations of this interface depend upon external
+ * synchronization of all use of the SchedulerProcess Collection and Iterators
+ * for correctness and to avoid concurrent modification issues, therefore,
+ * all use of SchedulerProcesses should be externally synchronized
+ */
+
+ /**
+ * Get the collection of SchedulerProcesses which are managed by this
+ * OrderingPolicy - should include processes returned by the Assignment and
+ * Preemption iterator with no guarantees regarding order
+ */
+ public Collection getSchedulerProcesses();
+
+ /**
+ * Return an iterator over the collection of SchedulerProcesses which orders
+ * them for container assignment as appropriate for this policy
+ */
+ public Iterator getAssignmentIterator();
+
+ /**
+ * Return an iterator over the collection of SchedulerProcesses which orders
+ * them for preemption as appropriate for this policy
+ */
+ public Iterator getPreemptionIterator();
+
+ public void containerAllocated(S s, RMContainer c);
+
+ public void containerReleased(S s, RMContainer c);
+
+ /**
+ * Display information regarding the policy, should include identification of
+ * policy type & information regarding configuration & status
+ */
+ public String getInfo();
+
+ public void addSchedulerProcess(S s);
+
+ public boolean removeSchedulerProcess(S s);
+
+ public void addAllSchedulerProcesses(Collection sc);
+
+ public int getSchedulerProcessesSize();
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java
new file mode 100644
index 0000000..15d9aec
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+
+/**
+ * An extension of Comparator with additions necessary for use by the
+ * SchedulerComparatorPolicy
+ */
+public interface SchedulerComparator
+ extends Comparator {
+
+ public boolean reorderOnContainerAllocate();
+
+ public boolean reorderOnContainerRelease();
+
+ /**
+ * If the Comparator supports configuration it can be passed here
+ * @param conf The configuration information
+ */
+ public void configure(String conf);
+
+ /**
+ * Information about the comparator, should include it's type & potentially
+ * configuration options & status
+ */
+ public String getInfo();
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java
new file mode 100644
index 0000000..a122080
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+
+
+/**
+ * An implementation of the OrderingPolicy whose behavior is delegated to a
+ * SchedulerComparator definition. This implementation can be used for cases
+ * where ordering logic can be expressed effectively in terms of a Comparator
+ */
+public class SchedulerComparatorPolicy
+ implements OrderingPolicy {
+
+ private static final Log LOG = LogFactory.getLog(SchedulerComparatorPolicy.class);
+
+ protected TreeSet schedulerProcesses;
+ SchedulerComparator comparator;
+
+ public SchedulerComparatorPolicy() { }
+
+ public SchedulerComparatorPolicy(SchedulerComparator
+ comparator) {
+ setSchedulerComparator(comparator);
+ }
+
+ @Override
+ public Collection getSchedulerProcesses() {
+ return schedulerProcesses;
+ }
+
+ @Override
+ public Iterator getAssignmentIterator() {
+ return schedulerProcesses.iterator();
+ }
+
+ @Override
+ public Iterator getPreemptionIterator() {
+ return schedulerProcesses.descendingIterator();
+ }
+
+ private void reorderSchedulerProcess(S schedulerProcess) {
+ //remove, update comparable data, and reinsert to update position in order
+ schedulerProcesses.remove(schedulerProcess);
+ schedulerProcess.updateCaches();
+ schedulerProcesses.add(schedulerProcess);
+ }
+
+ public void containerAllocated(S schedulerProcess,
+ RMContainer r) {
+ if (comparator.reorderOnContainerAllocate()) {
+ reorderSchedulerProcess(schedulerProcess);
+ }
+ }
+
+ public void containerReleased(S schedulerProcess,
+ RMContainer r) {
+ if (comparator.reorderOnContainerRelease()) {
+ reorderSchedulerProcess(schedulerProcess);
+ }
+ }
+
+ public void setSchedulerComparator(SchedulerComparator comparator) {
+ this.comparator = comparator;
+ TreeSet schedulerProcesses = new TreeSet(comparator);
+ if (this.schedulerProcesses != null) {
+ schedulerProcesses.addAll(this.schedulerProcesses);
+ }
+ this.schedulerProcesses = schedulerProcesses;
+ }
+
+ public SchedulerComparator getSchedulerComparator() {
+ return comparator;
+ }
+
+ public void configureComparators(String comparatorConfiguration) {
+ String[] comparatorDefs = comparatorConfiguration.split(
+ java.util.regex.Pattern.quote("+"));
+ if (comparatorDefs.length == 1) {
+ setSchedulerComparator(configureComparator(comparatorDefs[0]));
+ } else {
+ CompoundComparator compoundComparator = new CompoundComparator();
+ for (int i = 0;i < comparatorDefs.length;i++) {
+ compoundComparator.getComparators().add(
+ configureComparator(comparatorDefs[i])
+ );
+ }
+ setSchedulerComparator(compoundComparator);
+ }
+ }
+
+ public static SchedulerComparator configureComparator(
+ String comparatorDef) {
+
+ comparatorDef = comparatorDef.trim();
+ String[] defComponents = comparatorDef.split(":");
+
+ String comparatorClass = defComponents[0];
+
+ if (comparatorClass.equals("fifo")) {
+ comparatorClass =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator";
+ }
+ if (comparatorClass.equals("compound")) {
+ comparatorClass =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundComparator";
+ }
+ SchedulerComparator comparator;
+ try {
+ comparator = (SchedulerComparator)
+ Class.forName(comparatorClass).newInstance();
+ } catch (Exception e) {
+ String message = "Unable to construct scheduler comparator, " + e.getMessage();
+ throw new RuntimeException(message, e);
+ }
+ if (defComponents.length > 1) {
+ comparator.configure(defComponents[1]);
+ }
+ return comparator;
+ }
+
+ public String getInfo() {
+ String info = "SchedulerComparatorPolicy: " + comparator.getInfo();
+ return info;
+ }
+
+ public void addSchedulerProcess(S s) {
+ schedulerProcesses.add(s);
+ }
+
+ public boolean removeSchedulerProcess(S s) {
+ return schedulerProcesses.remove(s);
+ }
+
+ public void addAllSchedulerProcesses(Collection sc) {
+ schedulerProcesses.addAll(sc);
+ }
+
+ public int getSchedulerProcessesSize() {
+ return schedulerProcesses.size();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java
new file mode 100644
index 0000000..f45f756
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+
+/**
+ * A scheduler process is a process to be scheduled, i.e.
+ * an application / application attempt
+ */
+public interface SchedulerProcess {
+
+ public int compareInputOrderTo(SchedulerProcess other);
+
+ /**
+ * Id - each process must have a unique id
+ */
+ public String getId();
+
+ /**
+ * Cached view of Resources currently consumed by the process
+ */
+ public Resource getCachedConsumption();
+
+ /**
+ * Cached view of Resources wanted by the process, is sum of current
+ * consumption and outstanding resource requests
+ */
+ public Resource getCachedDemand();
+
+ /**
+ * Update cached values to current underlying values
+ */
+ public void updateCaches();
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
index e62fd70..4419877 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
@@ -92,6 +92,7 @@ protected void render(Block html) {
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
_("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())).
_("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
+ _("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
_("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled");
html._(InfoBlock.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index 5258b3d..f94a492 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -39,6 +39,7 @@
protected ResourceInfo usedAMResource;
protected ResourceInfo userAMResourceLimit;
protected boolean preemptionDisabled;
+ protected String orderingPolicyInfo;
CapacitySchedulerLeafQueueInfo() {
};
@@ -57,6 +58,7 @@
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
preemptionDisabled = q.getPreemptionDisabled();
+ orderingPolicyInfo = q.getOrderingPolicy().getInfo();
}
public int getNumActiveApplications() {
@@ -107,4 +109,8 @@ public ResourceInfo getUserAMResourceLimit() {
public boolean getPreemptionDisabled() {
return preemptionDisabled;
}
+
+ public String getOrderingPolicyInfo() {
+ return orderingPolicyInfo;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 1ca5c97..5214723 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -150,11 +150,13 @@ private FiCaSchedulerApp getMockApplication(int appId, String user,
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
TestUtils.getMockApplicationAttemptId(appId, 0);
+ String id = applicationAttemptId.toString();
doReturn(applicationAttemptId.getApplicationId()).
when(application).getApplicationId();
doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource();
+ doReturn(id).when(application).getId();
return application;
}
@@ -469,7 +471,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
assertEquals(0, queue.getNumPendingApplications(user_0));
- assertTrue(queue.activeApplications.contains(app_0));
+ assertTrue(queue.getApplications().contains(app_0));
// Submit second application
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
@@ -479,7 +481,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(0, queue.getNumPendingApplications(user_0));
- assertTrue(queue.activeApplications.contains(app_1));
+ assertTrue(queue.getApplications().contains(app_1));
// Submit third application, should remain pending
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
@@ -508,7 +510,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
assertFalse(queue.pendingApplications.contains(app_2));
- assertFalse(queue.activeApplications.contains(app_2));
+ assertFalse(queue.getApplications().contains(app_2));
// Finish 1st application, app_3 should become active
queue.finishApplicationAttempt(app_0, A);
@@ -516,9 +518,9 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(0, queue.getNumPendingApplications(user_0));
- assertTrue(queue.activeApplications.contains(app_3));
+ assertTrue(queue.getApplications().contains(app_3));
assertFalse(queue.pendingApplications.contains(app_3));
- assertFalse(queue.activeApplications.contains(app_0));
+ assertFalse(queue.getApplications().contains(app_0));
// Finish 2nd application
queue.finishApplicationAttempt(app_1, A);
@@ -526,7 +528,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
assertEquals(0, queue.getNumPendingApplications(user_0));
- assertFalse(queue.activeApplications.contains(app_1));
+ assertFalse(queue.getApplications().contains(app_1));
// Finish 4th application
queue.finishApplicationAttempt(app_3, A);
@@ -534,7 +536,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertEquals(0, queue.getNumPendingApplications());
assertEquals(0, queue.getNumActiveApplications(user_0));
assertEquals(0, queue.getNumPendingApplications(user_0));
- assertFalse(queue.activeApplications.contains(app_3));
+ assertFalse(queue.getApplications().contains(app_3));
}
@Test
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 972cabb..8b4bacb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -73,6 +73,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerComparatorPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundComparator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerComparator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerProcess;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -381,6 +387,45 @@ public void testUserQueueAcl() throws Exception {
d.submitApplicationAttempt(app_1, user_d); // same user
}
+ @Test
+ public void testPolicyConfiguration() throws Exception {
+
+ CapacitySchedulerConfiguration testConf =
+ new CapacitySchedulerConfiguration();
+
+ String tproot = CapacitySchedulerConfiguration.ROOT + "." +
+ "testPolicyRoot" + System.currentTimeMillis();
+
+ OrderingPolicy policy =
+ testConf.getOrderingPolicy(tproot);
+
+ //tests for expected defaults
+ SchedulerComparatorPolicy comPol =
+ (SchedulerComparatorPolicy) policy;
+ FifoComparator fcomp = (FifoComparator) comPol.getSchedulerComparator();
+
+ //override comparator default to compound
+ String comparatorConfig = CapacitySchedulerConfiguration.PREFIX + tproot +
+ "." + CapacitySchedulerConfiguration.ORDERING_POLICY;
+
+ //set multiple fifo's (just for testing), this will result in a
+ //compound containing multiple fifos
+ testConf.set(comparatorConfig, "fifo+fifo");
+ policy =
+ testConf.getOrderingPolicy(tproot);
+ comPol =
+ (SchedulerComparatorPolicy) policy;
+ CompoundComparator ccomp = (CompoundComparator) comPol.getSchedulerComparator();
+
+ List> comparators =
+ ccomp.getComparators();
+ assertEquals(2, comparators.size());
+ for (SchedulerComparator comparator : comparators) {
+ assertTrue(comparator instanceof FifoComparator);
+ }
+
+
+ }
@Test
public void testAppAttemptMetrics() throws Exception {
@@ -2011,7 +2056,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
e.submitApplicationAttempt(app_2, user_e); // same user
// before reinitialization
- assertEquals(2, e.activeApplications.size());
+ assertEquals(2, e.getNumActiveApplications());
assertEquals(1, e.pendingApplications.size());
csConf.setDouble(CapacitySchedulerConfiguration
@@ -2028,7 +2073,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
root.reinitialize(newRoot, csContext.getClusterResource());
// after reinitialization
- assertEquals(3, e.activeApplications.size());
+ assertEquals(3, e.getNumActiveApplications());
assertEquals(0, e.pendingApplications.size());
}
@@ -2092,7 +2137,7 @@ public void testActivateApplicationByUpdatingClusterResource()
e.submitApplicationAttempt(app_2, user_e); // same user
// before updating cluster resource
- assertEquals(2, e.activeApplications.size());
+ assertEquals(2, e.getNumActiveApplications());
assertEquals(1, e.pendingApplications.size());
Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32);
@@ -2100,7 +2145,7 @@ public void testActivateApplicationByUpdatingClusterResource()
new ResourceLimits(clusterResource));
// after updating cluster resource
- assertEquals(3, e.activeApplications.size());
+ assertEquals(3, e.getNumActiveApplications());
assertEquals(0, e.pendingApplications.size());
}
@@ -2451,6 +2496,84 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified()
+ "forget to set off-switch request should be handled");
}
}
+
+ @Test
+ public void testFifoAssignment() throws Exception {
+
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+ a.setOrderingPolicy(
+ new SchedulerComparatorPolicy(new FifoComparator()));
+
+ String host_0_0 = "127.0.0.1";
+ String rack_0 = "rack_0";
+ FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
+
+ final int numNodes = 4;
+ Resource clusterResource = Resources.createResource(
+ numNodes * (16*GB), numNodes * 16);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ String user_0 = "user_0";
+
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext));
+ a.submitApplicationAttempt(app_0, user_0);
+
+ final ApplicationAttemptId appAttemptId_1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app_1 =
+ spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext));
+ a.submitApplicationAttempt(app_1, user_0);
+
+ Priority priority = TestUtils.createMockPriority(1);
+ List app_0_requests_0 = new ArrayList();
+ List app_1_requests_0 = new ArrayList();
+
+ app_0_requests_0.clear();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
+ true, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ app_1_requests_0.clear();
+ app_1_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+ true, priority, recordFactory));
+ app_1.updateResourceRequests(app_1_requests_0);
+
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource));
+ Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource));
+ Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+ app_0_requests_0.clear();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+ true, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ app_1_requests_0.clear();
+ app_1_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+ true, priority, recordFactory));
+ app_1.updateResourceRequests(app_1_requests_0);
+
+ //Even thought it already has more resources, app_0 will still get
+ //assigned first
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource));
+ Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+ Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+ //and only then will app_1
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource));
+ Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+
+ }
@Test
public void testConcurrentAccess() throws Exception {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java
new file mode 100644
index 0000000..e0ccd25
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+
+public class MockSchedulerProcess implements SchedulerProcess {
+
+ private long serialEpoch = 0;
+ private long serial = 0;
+ private String id = "";
+ private Resource consumption = Resources.createResource(0);
+ private Resource demand = Resources.createResource(0);
+ private Resource cachedConsumption = Resources.createResource(0);
+ private Resource cachedDemand = Resources.createResource(0);
+
+ public MockSchedulerProcess() { }
+
+ public void setSerialEpoch(long serialEpoch) {
+ this.serialEpoch = serialEpoch;
+ }
+
+ private long getSerialEpoch() {
+ return serialEpoch;
+ }
+
+ public void setSerial(long serial) {
+ this.serial = serial;
+ }
+
+ private long getSerial() {
+ return serial;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setConsumption(Resource consumption) {
+ this.consumption = consumption;
+ }
+
+ public Resource getCachedConsumption() {
+ return cachedConsumption;
+ }
+
+ public void setDemand(Resource demand) {
+ this.demand = demand;
+ }
+
+ public Resource getCachedDemand() {
+ return cachedDemand;
+ }
+
+ public void updateCaches() {
+ cachedConsumption = consumption;
+ cachedDemand = demand;
+ }
+
+ @Override
+ public int compareInputOrderTo(SchedulerProcess other) {
+ if (other instanceof MockSchedulerProcess) {
+ MockSchedulerProcess r2 = (MockSchedulerProcess) other;
+ int res = (int) Math.signum(getSerialEpoch() - r2.getSerialEpoch());
+ if (res == 0) {
+ res = (int) Math.signum(getSerial() - r2.getSerial());
+ }
+ return res;
+ }
+ return 1;//let other types go before this, if any
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFifo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFifo.java
new file mode 100644
index 0000000..2ce84a3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFifo.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+
+public class TestComparatorPolicyFifo {
+
+ @Test
+ public void testIterators() {
+ SchedulerComparatorPolicy scp =
+ new SchedulerComparatorPolicy(new FifoComparator());
+
+ MockSchedulerProcess msp1 = new MockSchedulerProcess();
+ MockSchedulerProcess msp2 = new MockSchedulerProcess();
+ MockSchedulerProcess msp3 = new MockSchedulerProcess();
+
+ msp1.setSerial(3);
+ msp2.setSerial(2);
+ msp3.setSerial(1);
+
+ msp1.setId("3");
+ msp2.setId("2");
+ msp3.setId("1");
+
+ scp.addSchedulerProcess(msp1);
+ scp.addSchedulerProcess(msp2);
+ scp.addSchedulerProcess(msp3);
+
+ //Assignment, oldest to youngest
+ checkIds(scp.getAssignmentIterator(), new String[]{"1", "2", "3"});
+
+ //Preemption, youngest to oldest
+ checkIds(scp.getPreemptionIterator(), new String[]{"3", "2", "1"});
+
+ }
+
+ public void checkIds(Iterator si,
+ String[] ids) {
+ for (int i = 0;i < ids.length;i++) {
+ Assert.assertEquals(si.next().getId(),
+ ids[i]);
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java
new file mode 100644
index 0000000..7d903ff
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+
+public class TestFifoComparator {
+
+ @Test
+ public void testFifoComparator() {
+ Comparator comp = new FifoComparator();
+ MockSchedulerProcess r1 = new MockSchedulerProcess();
+ MockSchedulerProcess r2 = new MockSchedulerProcess();
+
+ Assert.assertEquals(comp.compare(r1, r2), 0);
+
+ //serial
+ r1.setSerial(1);
+ Assert.assertEquals(comp.compare(r1, r2), 1);
+
+ //serial epoch
+ r2.setSerialEpoch(1);
+ Assert.assertEquals(comp.compare(r1, r2), -1);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
index eb42679..9d5b5d6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
@@ -347,7 +347,7 @@ private void verifySubQueue(JSONObject info, String q,
int numExpectedElements = 13;
boolean isParentQueue = true;
if (!info.has("queues")) {
- numExpectedElements = 25;
+ numExpectedElements = 26;
isParentQueue = false;
}
assertEquals("incorrect number of elements", numExpectedElements, info.length());