diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 943ecb0..a129f24 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -141,6 +141,14 @@ + + + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java index bee5275..35e22b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java @@ -55,7 +55,7 @@ public static QueueInfo newInstance(String queueName, float capacity, float maximumCapacity, float currentCapacity, List childQueues, List applications, QueueState queueState, Set accessibleNodeLabels, - String defaultNodeLabelExpression) { + String defaultNodeLabelExpression, String orderingPolicy) { QueueInfo queueInfo = Records.newRecord(QueueInfo.class); queueInfo.setQueueName(queueName); queueInfo.setCapacity(capacity); @@ -66,6 +66,7 @@ public static QueueInfo newInstance(String queueName, float capacity, queueInfo.setQueueState(queueState); queueInfo.setAccessibleNodeLabels(accessibleNodeLabels); queueInfo.setDefaultNodeLabelExpression(defaultNodeLabelExpression); + queueInfo.setOrderingPolicy(orderingPolicy); return queueInfo; } @@ -184,4 +185,13 @@ public static QueueInfo newInstance(String queueName, float capacity, @Stable public abstract void setDefaultNodeLabelExpression( String defaultLabelExpression); + + @Public + @Unstable + public abstract String getOrderingPolicy(); + + @Public + @Unstable + public abstract void setOrderingPolicy( + String orderingPolicy); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index b396f4d..b7375c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -356,6 +356,7 @@ message QueueInfoProto { repeated ApplicationReportProto applications = 7; repeated string accessibleNodeLabels = 8; optional string defaultNodeLabelExpression = 9; + optional string orderingPolicy = 10; } enum QueueACLProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java index 8a5521d..da61a38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java @@ -151,5 +151,12 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) { labelList.append(nodeLabel); } writer.println(labelList.toString()); + + writer.print("\tOrdering Policy : "); + if (null != queueInfo.getOrderingPolicy()) { + writer.println(queueInfo.getOrderingPolicy()); + } else { + writer.println(); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index f468bc1..509b470 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -685,7 +685,7 @@ public YarnClusterMetrics createFakeYarnClusterMetrics() { public QueueInfo createFakeQueueInfo() { return QueueInfo.newInstance("root", 100f, 100f, 50f, null, - createFakeAppReports(), QueueState.RUNNING, null, null); + createFakeAppReports(), QueueState.RUNNING, null, null, null); } public List createFakeQueueUserACLInfoList() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 4b60c52..80f5892 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -1268,7 +1268,7 @@ public void testGetQueueInfo() throws Exception { nodeLabels.add("GPU"); nodeLabels.add("JDK_7"); QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f, - null, null, QueueState.RUNNING, nodeLabels, "GPU"); + null, null, QueueState.RUNNING, nodeLabels, "GPU", null); when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo); int result = cli.run(new String[] { "-status", "queueA" }); assertEquals(0, result); @@ -1283,6 +1283,7 @@ public void testGetQueueInfo() throws Exception { pw.println("\tMaximum Capacity : " + "80.0%"); pw.println("\tDefault Node Label expression : " + "GPU"); pw.println("\tAccessible Node Labels : " + "JDK_7,GPU"); + pw.println("\tOrdering Policy : "); pw.close(); String queueInfoStr = baos.toString("UTF-8"); Assert.assertEquals(queueInfoStr, sysOutStream.toString()); @@ -1292,7 +1293,7 @@ public void testGetQueueInfo() throws Exception { public void testGetQueueInfoWithEmptyNodeLabel() throws Exception { QueueCLI cli = createAndGetQueueCLI(); QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f, - null, null, QueueState.RUNNING, null, null); + null, null, QueueState.RUNNING, null, null, null); when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo); int result = cli.run(new String[] { "-status", "queueA" }); assertEquals(0, result); @@ -1307,6 +1308,7 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception { pw.println("\tMaximum Capacity : " + "80.0%"); pw.println("\tDefault Node Label expression : "); pw.println("\tAccessible Node Labels : "); + pw.println("\tOrdering Policy : "); pw.close(); String queueInfoStr = baos.toString("UTF-8"); Assert.assertEquals(queueInfoStr, sysOutStream.toString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java index 4b83500..09f437f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java @@ -368,4 +368,21 @@ public void setDefaultNodeLabelExpression(String defaultNodeLabelExpression) { } builder.setDefaultNodeLabelExpression(defaultNodeLabelExpression); } + + @Override + public String getOrderingPolicy() { + QueueInfoProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasOrderingPolicy()) ? p + .getOrderingPolicy() : null; + } + + @Override + public void setOrderingPolicy(String orderingPolicy) { + maybeInitBuilder(); + if (orderingPolicy == null) { + builder.clearOrderingPolicy(); + return; + } + builder.setOrderingPolicy(orderingPolicy); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 80299c0..d6c0e2b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -482,7 +482,7 @@ public static void setup() throws Exception { // it is recursive(has sub queues) typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f, 1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"), - "x && y")); + "x && y", null)); generateByNewInstance(QueueUserACLInfo.class); generateByNewInstance(YarnClusterMetrics.class); // for reservation system diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 5e0bbc7..0179847 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerProcess; + import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; @@ -69,7 +71,7 @@ */ @Private @Unstable -public class SchedulerApplicationAttempt { +public class SchedulerApplicationAttempt implements SchedulerProcess { private static final Log LOG = LogFactory .getLog(SchedulerApplicationAttempt.class); @@ -669,4 +671,58 @@ public void recordContainerAllocationTime(long value) { public Set getBlacklistedNodes() { return this.appSchedulingInfo.getBlackListCopy(); } + + public Resource getDemand() { + Resource demand = Resources.createResource(0); + // Demand is current consumption plus outstanding requests + Resources.addTo(demand, getCurrentConsumption()); + + // Add up outstanding resource requests + synchronized (this) { + for (Priority p : getPriorities()) { + for (ResourceRequest r : getResourceRequests(p).values()) { + Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); + Resources.addTo(demand, total); + } + } + } + return demand; + } + + @Override + public int compareInputOrderTo(SchedulerProcess other) { + if (other instanceof SchedulerApplicationAttempt) { + return getApplicationId().compareTo( + ((SchedulerApplicationAttempt)other).getApplicationId()); + } + return 1;//let other types go before this, if any + } + + @Override + public String getId() { + return getApplicationId().toString(); + } + + private Resource cachedConsumption = Resources.createResource(0); + + @Override + public Resource getCachedConsumption() { + return cachedConsumption; + } + + private Resource cachedDemand = Resources.createResource(0); + + @Override + public Resource getCachedDemand() { + return cachedDemand; + } + + @Override + public void updateCaches() { + Resource updConsumption = Resources.createResource(0); + Resources.addTo(updConsumption, getCurrentConsumption()); + cachedConsumption = updConsumption; + cachedDemand = getDemand(); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c86c0ff..351d271 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1473,7 +1473,7 @@ public synchronized void removeQueue(String queueName) // at this point we should have no more apps if (disposableLeafQueue.getNumApplications() > 0) { throw new SchedulerDynamicEditException("The queue " + queueName - + " is not empty " + disposableLeafQueue.getApplications().size() + + " is not empty " + disposableLeafQueue.getNumActiveApplications() + " active apps " + disposableLeafQueue.pendingApplications.size() + " pending apps"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 102e553..e3ff5e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; + + import com.google.common.collect.ImmutableSet; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -117,6 +120,14 @@ public static final String MAXIMUM_ALLOCATION_VCORES = "maximum-allocation-vcores"; + public static final String ORDERING_POLICY_CLASS = "ordering-policy-class"; + + public static final String DEFAULT_ORDERING_POLICY_CLASS = ""; + + public static final String ORDERING_POLICY = "ordering-policy"; + + public static final String DEFAULT_ORDERING_POLICY = "fifo"; + @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -373,6 +384,35 @@ public int getUserLimit(String queue) { DEFAULT_USER_LIMIT); return userLimit; } + + @SuppressWarnings("unchecked") + public OrderingPolicy getOrderingPolicy( + String queue) { + + String policyClass = get(getQueuePrefix(queue) + ORDERING_POLICY_CLASS, + DEFAULT_ORDERING_POLICY_CLASS); + + if (policyClass.equals("")) { + policyClass = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerComparatorPolicy"; + } + + String policyConfig = get(getQueuePrefix(queue) + ORDERING_POLICY, + DEFAULT_ORDERING_POLICY); + + OrderingPolicy policy; + try { + policy = (OrderingPolicy) Class.forName(policyClass).newInstance(); + } catch (Exception e) { + String message = "Unable to construct ordering policy for " + policyClass + ", message: " + e.getMessage(); + LOG.error(message, e); + throw new RuntimeException(message, e); + } + if (policy instanceof SchedulerComparatorPolicy) { + ((SchedulerComparatorPolicy)policy).configureComparators(policyConfig); + } + return policy; + } public void setUserLimit(String queue, int userLimit) { setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3e5405d..2ec6627 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -93,7 +94,6 @@ private int nodeLocalityDelay; - Set activeApplications; Map applicationAttemptMap = new HashMap(); @@ -121,6 +121,10 @@ private volatile ResourceLimits currentResourceLimits = null; + private OrderingPolicy + orderingPolicy = + new SchedulerComparatorPolicy(new FifoComparator()); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -137,7 +141,6 @@ public LeafQueue(CapacitySchedulerContext cs, cs.getApplicationComparator(); this.pendingApplications = new TreeSet(applicationComparator); - this.activeApplications = new TreeSet(applicationComparator); setupQueueConfigs(cs.getClusterResource()); } @@ -159,6 +162,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) setQueueResourceLimitsInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + + setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); + userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -322,7 +328,7 @@ public synchronized int getNumPendingApplications() { } public synchronized int getNumActiveApplications() { - return activeApplications.size(); + return orderingPolicy.getSchedulerProcessesSize(); } @Private @@ -363,6 +369,7 @@ public synchronized float getUserLimitFactor() { public synchronized QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { QueueInfo queueInfo = getQueueInfo(); + queueInfo.setOrderingPolicy(getOrderingPolicy().getInfo()); return queueInfo; } @@ -637,7 +644,7 @@ private synchronized void activateApplications() { } } user.activateApplication(); - activeApplications.add(application); + orderingPolicy.addSchedulerProcess(application); queueUsage.incAMUsed(application.getAMResource()); user.getResourceUsage().incAMUsed(application.getAMResource()); i.remove(); @@ -686,7 +693,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) public synchronized void removeApplicationAttempt( FiCaSchedulerApp application, User user) { - boolean wasActive = activeApplications.remove(application); + boolean wasActive = + orderingPolicy.removeSchedulerProcess(application); if (!wasActive) { pendingApplications.remove(application); } else { @@ -755,7 +763,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + activeApplications.size()); + + " #applications=" + + orderingPolicy.getSchedulerProcessesSize()); } // if our queue cannot access this node, just return @@ -775,9 +784,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } } - // Try to assign containers to applications in order - for (FiCaSchedulerApp application : activeApplications) { - + for (Iterator assignmentIterator = + orderingPolicy.getAssignmentIterator(); + assignmentIterator.hasNext();) { + FiCaSchedulerApp application = assignmentIterator.next(); if(LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + application.getApplicationId()); @@ -1542,6 +1552,9 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod if (allocatedContainer == null) { return Resources.none(); } + + // Inform the ordering policy + orderingPolicy.containerAllocated(application, allocatedContainer); // Inform the node node.allocateContainer(allocatedContainer); @@ -1642,6 +1655,10 @@ public void completedContainer(Resource clusterResource, removed = application.containerCompleted(rmContainer, containerStatus, event, node.getPartition()); + + // Inform the ordering policy + orderingPolicy.containerReleased(application, rmContainer); + node.releaseContainer(container); } @@ -1749,7 +1766,8 @@ public synchronized void updateClusterResource(Resource clusterResource, activateApplications(); // Update application properties - for (FiCaSchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : + orderingPolicy.getSchedulerProcesses()) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, Resources.none(), null); @@ -1855,19 +1873,19 @@ public void recoverContainer(Resource clusterResource, } getParent().recoverContainer(clusterResource, attempt, rmContainer); } - + /** * Obtain (read-only) collection of active applications. */ - public Set getApplications() { - // need to access the list of apps from the preemption monitor - return activeApplications; + public Collection getApplications() { + return orderingPolicy.getSchedulerProcesses(); } // return a single Resource capturing the overal amount of pending resources public synchronized Resource getTotalResourcePending() { Resource ret = BuilderUtils.newResource(0, 0); - for (FiCaSchedulerApp f : activeApplications) { + for (FiCaSchedulerApp f : + orderingPolicy.getSchedulerProcesses()) { Resources.addTo(ret, f.getTotalPendingRequests()); } return ret; @@ -1879,7 +1897,8 @@ public synchronized void collectSchedulerApplications( for (FiCaSchedulerApp pendingApp : pendingApplications) { apps.add(pendingApp.getApplicationAttemptId()); } - for (FiCaSchedulerApp app : activeApplications) { + for (FiCaSchedulerApp app : + orderingPolicy.getSchedulerProcesses()) { apps.add(app.getApplicationAttemptId()); } } @@ -1932,6 +1951,19 @@ public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } + public synchronized OrderingPolicy + getOrderingPolicy() { + return orderingPolicy; + } + + public synchronized void setOrderingPolicy( + OrderingPolicy orderingPolicy) { + orderingPolicy.addAllSchedulerProcesses( + this.orderingPolicy.getSchedulerProcesses() + ); + this.orderingPolicy = orderingPolicy; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index bf2a25b..753c94a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -68,10 +68,9 @@ public static SchedulingPolicy getInstance(Class cla * @param policy canonical class name or "drf" or "fair" or "fifo" * @throws AllocationConfigurationException */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public static SchedulingPolicy parse(String policy) throws AllocationConfigurationException { - @SuppressWarnings("rawtypes") Class clazz; String text = StringUtils.toLowerCase(policy); if (text.equalsIgnoreCase(FairSharePolicy.NAME)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java new file mode 100644 index 0000000..117e1f4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * A SchedulerComparator which supports combining other + * SchedulerComparators together to implement overall logic. + * Comparison occurs in the order of the iterators in the comparators + * member, with the first non-0 value returned + * (0 if all results are 0) + */ +public class CompoundComparator + implements SchedulerComparator { + + private List> comparators = + new ArrayList>(); + + @Override + public int compare(SchedulerProcess r1, SchedulerProcess r2) { + for (SchedulerComparator comparator : comparators) { + int result = comparator.compare(r1, r2); + if (result != 0) return result; + } + return 0; + } + + @Override + public boolean reorderOnContainerAllocate() { + for (SchedulerComparator comparator : comparators) { + boolean result = comparator.reorderOnContainerAllocate(); + if (result) return result; + } + return false; + } + + @Override + public boolean reorderOnContainerRelease() { + for (SchedulerComparator comparator : comparators) { + boolean result = comparator.reorderOnContainerRelease(); + if (result) return result; + } + return false; + } + + public void setComparators( + List> comparators) { + this.comparators = comparators; + } + + public List> getComparators() { + return comparators; + } + + @Override + public void configure(String conf) { + + } + + @Override + public String getInfo() { + StringBuilder info = new StringBuilder(); + info.append("CompoundComparator ( "); + for (Iterator> ic = comparators.iterator(); + ic.hasNext();) { + SchedulerComparator c = ic.next(); + info.append(c.getInfo()); + if (ic.hasNext()) { + info.append(", "); + } + } + info.append(" )"); + return info.toString(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java new file mode 100644 index 0000000..14b58c1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * A comparator which orders SchedulerProcesses by submission order, earlier + * submission is lesser + */ +public class FifoComparator implements SchedulerComparator { + + @Override + public int compare(SchedulerProcess r1, SchedulerProcess r2) { + int res = r1.compareInputOrderTo(r2); + if (res == 0) { + //cannot return equality for different processses, will result in + //"loss". process must always have unique ids to use as a fallback + res = r1.getId().compareTo(r2.getId()); + } + return res; + } + + @Override + public boolean reorderOnContainerAllocate() { + return false; + } + + @Override + public boolean reorderOnContainerRelease() { + return false; + } + + @Override + public void configure(String conf) { + + } + + @Override + public String getInfo() { + return "FifoComparator"; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java new file mode 100644 index 0000000..9529eb6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * An OrderingPolicy is used by the scheduler to order SchedulerProcesses for + * container assignment and preemption + */ +public interface OrderingPolicy { + /* + * Note: Some implementations of this interface depend upon external + * synchronization of all use of the SchedulerProcess Collection and Iterators + * for correctness and to avoid concurrent modification issues, therefore, + * all use of SchedulerProcesses should be externally synchronized + */ + + /** + * Get the collection of SchedulerProcesses which are managed by this + * OrderingPolicy - should include processes returned by the Assignment and + * Preemption iterator with no guarantees regarding order + */ + public Collection getSchedulerProcesses(); + + /** + * Return an iterator over the collection of SchedulerProcesses which orders + * them for container assignment as appropriate for this policy + */ + public Iterator getAssignmentIterator(); + + /** + * Return an iterator over the collection of SchedulerProcesses which orders + * them for preemption as appropriate for this policy + */ + public Iterator getPreemptionIterator(); + + public void containerAllocated(S s, RMContainer c); + + public void containerReleased(S s, RMContainer c); + + /** + * Display information regarding the policy, should include identification of + * policy type & information regarding configuration & status + */ + public String getInfo(); + + public void addSchedulerProcess(S s); + + public boolean removeSchedulerProcess(S s); + + public void addAllSchedulerProcesses(Collection sc); + + public int getSchedulerProcessesSize(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java new file mode 100644 index 0000000..15d9aec --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * An extension of Comparator with additions necessary for use by the + * SchedulerComparatorPolicy + */ +public interface SchedulerComparator + extends Comparator { + + public boolean reorderOnContainerAllocate(); + + public boolean reorderOnContainerRelease(); + + /** + * If the Comparator supports configuration it can be passed here + * @param conf The configuration information + */ + public void configure(String conf); + + /** + * Information about the comparator, should include it's type & potentially + * configuration options & status + */ + public String getInfo(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java new file mode 100644 index 0000000..a122080 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + + +/** + * An implementation of the OrderingPolicy whose behavior is delegated to a + * SchedulerComparator definition. This implementation can be used for cases + * where ordering logic can be expressed effectively in terms of a Comparator + */ +public class SchedulerComparatorPolicy + implements OrderingPolicy { + + private static final Log LOG = LogFactory.getLog(SchedulerComparatorPolicy.class); + + protected TreeSet schedulerProcesses; + SchedulerComparator comparator; + + public SchedulerComparatorPolicy() { } + + public SchedulerComparatorPolicy(SchedulerComparator + comparator) { + setSchedulerComparator(comparator); + } + + @Override + public Collection getSchedulerProcesses() { + return schedulerProcesses; + } + + @Override + public Iterator getAssignmentIterator() { + return schedulerProcesses.iterator(); + } + + @Override + public Iterator getPreemptionIterator() { + return schedulerProcesses.descendingIterator(); + } + + private void reorderSchedulerProcess(S schedulerProcess) { + //remove, update comparable data, and reinsert to update position in order + schedulerProcesses.remove(schedulerProcess); + schedulerProcess.updateCaches(); + schedulerProcesses.add(schedulerProcess); + } + + public void containerAllocated(S schedulerProcess, + RMContainer r) { + if (comparator.reorderOnContainerAllocate()) { + reorderSchedulerProcess(schedulerProcess); + } + } + + public void containerReleased(S schedulerProcess, + RMContainer r) { + if (comparator.reorderOnContainerRelease()) { + reorderSchedulerProcess(schedulerProcess); + } + } + + public void setSchedulerComparator(SchedulerComparator comparator) { + this.comparator = comparator; + TreeSet schedulerProcesses = new TreeSet(comparator); + if (this.schedulerProcesses != null) { + schedulerProcesses.addAll(this.schedulerProcesses); + } + this.schedulerProcesses = schedulerProcesses; + } + + public SchedulerComparator getSchedulerComparator() { + return comparator; + } + + public void configureComparators(String comparatorConfiguration) { + String[] comparatorDefs = comparatorConfiguration.split( + java.util.regex.Pattern.quote("+")); + if (comparatorDefs.length == 1) { + setSchedulerComparator(configureComparator(comparatorDefs[0])); + } else { + CompoundComparator compoundComparator = new CompoundComparator(); + for (int i = 0;i < comparatorDefs.length;i++) { + compoundComparator.getComparators().add( + configureComparator(comparatorDefs[i]) + ); + } + setSchedulerComparator(compoundComparator); + } + } + + public static SchedulerComparator configureComparator( + String comparatorDef) { + + comparatorDef = comparatorDef.trim(); + String[] defComponents = comparatorDef.split(":"); + + String comparatorClass = defComponents[0]; + + if (comparatorClass.equals("fifo")) { + comparatorClass = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator"; + } + if (comparatorClass.equals("compound")) { + comparatorClass = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundComparator"; + } + SchedulerComparator comparator; + try { + comparator = (SchedulerComparator) + Class.forName(comparatorClass).newInstance(); + } catch (Exception e) { + String message = "Unable to construct scheduler comparator, " + e.getMessage(); + throw new RuntimeException(message, e); + } + if (defComponents.length > 1) { + comparator.configure(defComponents[1]); + } + return comparator; + } + + public String getInfo() { + String info = "SchedulerComparatorPolicy: " + comparator.getInfo(); + return info; + } + + public void addSchedulerProcess(S s) { + schedulerProcesses.add(s); + } + + public boolean removeSchedulerProcess(S s) { + return schedulerProcesses.remove(s); + } + + public void addAllSchedulerProcesses(Collection sc) { + schedulerProcesses.addAll(sc); + } + + public int getSchedulerProcessesSize() { + return schedulerProcesses.size(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java new file mode 100644 index 0000000..f45f756 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; + + +/** + * A scheduler process is a process to be scheduled, i.e. + * an application / application attempt + */ +public interface SchedulerProcess { + + public int compareInputOrderTo(SchedulerProcess other); + + /** + * Id - each process must have a unique id + */ + public String getId(); + + /** + * Cached view of Resources currently consumed by the process + */ + public Resource getCachedConsumption(); + + /** + * Cached view of Resources wanted by the process, is sum of current + * consumption and outstanding resource requests + */ + public Resource getCachedDemand(); + + /** + * Update cached values to current underlying values + */ + public void updateCaches(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index e62fd70..4419877 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -92,6 +92,7 @@ protected void render(Block html) { _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). _("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())). _("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())). + _("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()). _("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled"); html._(InfoBlock.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index 5258b3d..f94a492 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -39,6 +39,7 @@ protected ResourceInfo usedAMResource; protected ResourceInfo userAMResourceLimit; protected boolean preemptionDisabled; + protected String orderingPolicyInfo; CapacitySchedulerLeafQueueInfo() { }; @@ -57,6 +58,7 @@ usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed()); userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit()); preemptionDisabled = q.getPreemptionDisabled(); + orderingPolicyInfo = q.getOrderingPolicy().getInfo(); } public int getNumActiveApplications() { @@ -107,4 +109,8 @@ public ResourceInfo getUserAMResourceLimit() { public boolean getPreemptionDisabled() { return preemptionDisabled; } + + public String getOrderingPolicyInfo() { + return orderingPolicyInfo; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 1ca5c97..5214723 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -150,11 +150,13 @@ private FiCaSchedulerApp getMockApplication(int appId, String user, FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); + String id = applicationAttemptId.toString(); doReturn(applicationAttemptId.getApplicationId()). when(application).getApplicationId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); doReturn(amResource).when(application).getAMResource(); + doReturn(id).when(application).getId(); return application; } @@ -469,7 +471,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_0)); + assertTrue(queue.getApplications().contains(app_0)); // Submit second application FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, @@ -479,7 +481,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_1)); + assertTrue(queue.getApplications().contains(app_1)); // Submit third application, should remain pending FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, @@ -508,7 +510,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); assertFalse(queue.pendingApplications.contains(app_2)); - assertFalse(queue.activeApplications.contains(app_2)); + assertFalse(queue.getApplications().contains(app_2)); // Finish 1st application, app_3 should become active queue.finishApplicationAttempt(app_0, A); @@ -516,9 +518,9 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_3)); + assertTrue(queue.getApplications().contains(app_3)); assertFalse(queue.pendingApplications.contains(app_3)); - assertFalse(queue.activeApplications.contains(app_0)); + assertFalse(queue.getApplications().contains(app_0)); // Finish 2nd application queue.finishApplicationAttempt(app_1, A); @@ -526,7 +528,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_1)); + assertFalse(queue.getApplications().contains(app_1)); // Finish 4th application queue.finishApplicationAttempt(app_3, A); @@ -534,7 +536,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(0, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_3)); + assertFalse(queue.getApplications().contains(app_3)); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 972cabb..8b4bacb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -73,6 +73,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerComparatorPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundComparator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerComparator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerProcess; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -381,6 +387,45 @@ public void testUserQueueAcl() throws Exception { d.submitApplicationAttempt(app_1, user_d); // same user } + @Test + public void testPolicyConfiguration() throws Exception { + + CapacitySchedulerConfiguration testConf = + new CapacitySchedulerConfiguration(); + + String tproot = CapacitySchedulerConfiguration.ROOT + "." + + "testPolicyRoot" + System.currentTimeMillis(); + + OrderingPolicy policy = + testConf.getOrderingPolicy(tproot); + + //tests for expected defaults + SchedulerComparatorPolicy comPol = + (SchedulerComparatorPolicy) policy; + FifoComparator fcomp = (FifoComparator) comPol.getSchedulerComparator(); + + //override comparator default to compound + String comparatorConfig = CapacitySchedulerConfiguration.PREFIX + tproot + + "." + CapacitySchedulerConfiguration.ORDERING_POLICY; + + //set multiple fifo's (just for testing), this will result in a + //compound containing multiple fifos + testConf.set(comparatorConfig, "fifo+fifo"); + policy = + testConf.getOrderingPolicy(tproot); + comPol = + (SchedulerComparatorPolicy) policy; + CompoundComparator ccomp = (CompoundComparator) comPol.getSchedulerComparator(); + + List> comparators = + ccomp.getComparators(); + assertEquals(2, comparators.size()); + for (SchedulerComparator comparator : comparators) { + assertTrue(comparator instanceof FifoComparator); + } + + + } @Test public void testAppAttemptMetrics() throws Exception { @@ -2011,7 +2056,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { e.submitApplicationAttempt(app_2, user_e); // same user // before reinitialization - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); csConf.setDouble(CapacitySchedulerConfiguration @@ -2028,7 +2073,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { root.reinitialize(newRoot, csContext.getClusterResource()); // after reinitialization - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2092,7 +2137,7 @@ public void testActivateApplicationByUpdatingClusterResource() e.submitApplicationAttempt(app_2, user_e); // same user // before updating cluster resource - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); @@ -2100,7 +2145,7 @@ public void testActivateApplicationByUpdatingClusterResource() new ResourceLimits(clusterResource)); // after updating cluster resource - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2451,6 +2496,84 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() + "forget to set off-switch request should be handled"); } } + + @Test + public void testFifoAssignment() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + a.setOrderingPolicy( + new SchedulerComparatorPolicy(new FifoComparator())); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource( + numNodes * (16*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + //Even thought it already has more resources, app_0 will still get + //assigned first + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + //and only then will app_1 + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + + } @Test public void testConcurrentAccess() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java new file mode 100644 index 0000000..e0ccd25 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + + +public class MockSchedulerProcess implements SchedulerProcess { + + private long serialEpoch = 0; + private long serial = 0; + private String id = ""; + private Resource consumption = Resources.createResource(0); + private Resource demand = Resources.createResource(0); + private Resource cachedConsumption = Resources.createResource(0); + private Resource cachedDemand = Resources.createResource(0); + + public MockSchedulerProcess() { } + + public void setSerialEpoch(long serialEpoch) { + this.serialEpoch = serialEpoch; + } + + private long getSerialEpoch() { + return serialEpoch; + } + + public void setSerial(long serial) { + this.serial = serial; + } + + private long getSerial() { + return serial; + } + + public void setId(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public void setConsumption(Resource consumption) { + this.consumption = consumption; + } + + public Resource getCachedConsumption() { + return cachedConsumption; + } + + public void setDemand(Resource demand) { + this.demand = demand; + } + + public Resource getCachedDemand() { + return cachedDemand; + } + + public void updateCaches() { + cachedConsumption = consumption; + cachedDemand = demand; + } + + @Override + public int compareInputOrderTo(SchedulerProcess other) { + if (other instanceof MockSchedulerProcess) { + MockSchedulerProcess r2 = (MockSchedulerProcess) other; + int res = (int) Math.signum(getSerialEpoch() - r2.getSerialEpoch()); + if (res == 0) { + res = (int) Math.signum(getSerial() - r2.getSerial()); + } + return res; + } + return 1;//let other types go before this, if any + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFifo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFifo.java new file mode 100644 index 0000000..2ce84a3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFifo.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestComparatorPolicyFifo { + + @Test + public void testIterators() { + SchedulerComparatorPolicy scp = + new SchedulerComparatorPolicy(new FifoComparator()); + + MockSchedulerProcess msp1 = new MockSchedulerProcess(); + MockSchedulerProcess msp2 = new MockSchedulerProcess(); + MockSchedulerProcess msp3 = new MockSchedulerProcess(); + + msp1.setSerial(3); + msp2.setSerial(2); + msp3.setSerial(1); + + msp1.setId("3"); + msp2.setId("2"); + msp3.setId("1"); + + scp.addSchedulerProcess(msp1); + scp.addSchedulerProcess(msp2); + scp.addSchedulerProcess(msp3); + + //Assignment, oldest to youngest + checkIds(scp.getAssignmentIterator(), new String[]{"1", "2", "3"}); + + //Preemption, youngest to oldest + checkIds(scp.getPreemptionIterator(), new String[]{"3", "2", "1"}); + + } + + public void checkIds(Iterator si, + String[] ids) { + for (int i = 0;i < ids.length;i++) { + Assert.assertEquals(si.next().getId(), + ids[i]); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java new file mode 100644 index 0000000..7d903ff --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestFifoComparator { + + @Test + public void testFifoComparator() { + Comparator comp = new FifoComparator(); + MockSchedulerProcess r1 = new MockSchedulerProcess(); + MockSchedulerProcess r2 = new MockSchedulerProcess(); + + Assert.assertEquals(comp.compare(r1, r2), 0); + + //serial + r1.setSerial(1); + Assert.assertEquals(comp.compare(r1, r2), 1); + + //serial epoch + r2.setSerialEpoch(1); + Assert.assertEquals(comp.compare(r1, r2), -1); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index eb42679..9d5b5d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -347,7 +347,7 @@ private void verifySubQueue(JSONObject info, String q, int numExpectedElements = 13; boolean isParentQueue = true; if (!info.has("queues")) { - numExpectedElements = 25; + numExpectedElements = 26; isParentQueue = false; } assertEquals("incorrect number of elements", numExpectedElements, info.length());