diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 738f527..c91127c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -539,9 +539,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, // lock the leafqueue while we scan applications and unreserve synchronized (qT.leafQueue) { - NavigableSet ns = - (NavigableSet) qT.leafQueue.getApplications(); - Iterator desc = ns.descendingIterator(); + Iterator desc = qT.leafQueue.getOrderingPolicy().getPreemptionIterator(); qT.actuallyPreempted = Resources.clone(resToObtain); while (desc.hasNext()) { FiCaSchedulerApp fc = desc.next(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ed78097..7dc3034 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulerProcess; + import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; @@ -68,7 +70,7 @@ */ @Private @Unstable -public class SchedulerApplicationAttempt { +public class SchedulerApplicationAttempt implements SchedulerProcess { private static final Log LOG = LogFactory .getLog(SchedulerApplicationAttempt.class); @@ -414,6 +416,7 @@ public synchronized void showRequests() { } } + @Override public Resource getCurrentConsumption() { return currentConsumption; } @@ -632,4 +635,15 @@ public void incNumAllocatedContainers(NodeType containerType, requestType); } } + + @Override + public long getSerialEpoch() { + return getApplicationId().getClusterTimestamp(); + } + + @Override + public long getSerial() { + return getApplicationId().getId(); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 28ce264..5b3d170 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1461,7 +1461,7 @@ public synchronized void removeQueue(String queueName) // at this point we should have no more apps if (disposableLeafQueue.getNumApplications() > 0) { throw new SchedulerDynamicEditException("The queue " + queueName - + " is not empty " + disposableLeafQueue.getApplications().size() + + " is not empty " + disposableLeafQueue.getNumActiveApplications() + " active apps " + disposableLeafQueue.pendingApplications.size() + " pending apps"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a607a62..a5d22f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -94,7 +95,6 @@ private int nodeLocalityDelay; - Set activeApplications; Map applicationAttemptMap = new HashMap(); @@ -122,6 +122,10 @@ private volatile ResourceLimits currentResourceLimits = null; + private OrderingPolicy + orderingPolicy = + new SchedulerComparatorPolicy(new FifoComparator()); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -138,7 +142,6 @@ public LeafQueue(CapacitySchedulerContext cs, cs.getApplicationComparator(); this.pendingApplications = new TreeSet(applicationComparator); - this.activeApplications = new TreeSet(applicationComparator); setupQueueConfigs(cs.getClusterResource()); } @@ -323,7 +326,7 @@ public synchronized int getNumPendingApplications() { } public synchronized int getNumActiveApplications() { - return activeApplications.size(); + return orderingPolicy.getSchedulerProcesses().size(); } @Private @@ -635,7 +638,7 @@ private synchronized void activateApplications() { } } user.activateApplication(); - activeApplications.add(application); + orderingPolicy.getSchedulerProcesses().add(application); queueUsage.incAMUsed(application.getAMResource()); user.getResourceUsage().incAMUsed(application.getAMResource()); i.remove(); @@ -684,7 +687,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) public synchronized void removeApplicationAttempt( FiCaSchedulerApp application, User user) { - boolean wasActive = activeApplications.remove(application); + boolean wasActive = + orderingPolicy.getSchedulerProcesses().remove(application); if (!wasActive) { pendingApplications.remove(application); } else { @@ -745,7 +749,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + activeApplications.size()); + + " #applications=" + + orderingPolicy.getSchedulerProcesses().size()); } // if our queue cannot access this node, just return @@ -765,9 +770,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } } - // Try to assign containers to applications in order - for (FiCaSchedulerApp application : activeApplications) { - + for (Iterator assignmentIterator = + orderingPolicy.getAssignmentIterator(); + assignmentIterator.hasNext();) { + FiCaSchedulerApp application = assignmentIterator.next(); if(LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + application.getApplicationId()); @@ -864,6 +870,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, application.resetSchedulingOpportunities(priority); } + // Inform the ordering policy + orderingPolicy.handleSchedulerProcessEvent(application, + SchedulerProcess.SchedulerProcessEvent.CONTAINER_ASSIGNED); + // Done return assignment; } else { @@ -1807,7 +1817,8 @@ public synchronized void updateClusterResource(Resource clusterResource, activateApplications(); // Update application properties - for (FiCaSchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : + orderingPolicy.getSchedulerProcesses()) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, Resources.none(), null); @@ -1904,19 +1915,19 @@ public void recoverContainer(Resource clusterResource, } getParent().recoverContainer(clusterResource, attempt, rmContainer); } - + /** * Obtain (read-only) collection of active applications. */ - public Set getApplications() { - // need to access the list of apps from the preemption monitor - return activeApplications; + public Collection getApplications() { + return orderingPolicy.getSchedulerProcesses(); } // return a single Resource capturing the overal amount of pending resources public synchronized Resource getTotalResourcePending() { Resource ret = BuilderUtils.newResource(0, 0); - for (FiCaSchedulerApp f : activeApplications) { + for (FiCaSchedulerApp f : + orderingPolicy.getSchedulerProcesses()) { Resources.addTo(ret, f.getTotalPendingRequests()); } return ret; @@ -1928,7 +1939,8 @@ public synchronized void collectSchedulerApplications( for (FiCaSchedulerApp pendingApp : pendingApplications) { apps.add(pendingApp.getApplicationAttemptId()); } - for (FiCaSchedulerApp app : activeApplications) { + for (FiCaSchedulerApp app : + orderingPolicy.getSchedulerProcesses()) { apps.add(app.getApplicationAttemptId()); } } @@ -1981,6 +1993,19 @@ public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } + public synchronized OrderingPolicy + getOrderingPolicy() { + return orderingPolicy; + } + + public synchronized void setOrderingPolicy( + OrderingPolicy orderingPolicy) { + orderingPolicy.getSchedulerProcesses().addAll( + this.orderingPolicy.getSchedulerProcesses() + ); + this.orderingPolicy = orderingPolicy; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java new file mode 100644 index 0000000..8e68473 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * A SchedulerComparator which supports combining other + * SchedulerComparators together to implement overall logic. + * Comparison occurs in the order of the iterators in the comparators + * member, with the first non-0 value returned + * (0 if all results are 0) + */ +public class CompoundComparator + implements SchedulerComparator { + + private List> comparators = + new ArrayList>(); + + @Override + public int compare(SchedulerProcess r1, SchedulerProcess r2) { + for (SchedulerComparator comparator : comparators) { + int result = comparator.compare(r1, r2); + if (result != 0) return result; + } + return 0; + } + + @Override + public boolean isReorderingEvent( + SchedulerProcess.SchedulerProcessEvent event) { + for (SchedulerComparator comparator : comparators) { + boolean result = comparator.isReorderingEvent(event); + if (result) return result; + } + return false; + } + + public void setComparators( + List> comparators) { + this.comparators = comparators; + } + + public List> getComparators() { + return comparators; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java new file mode 100644 index 0000000..af148ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * A comparator which orders SchedulerProcesses by submission order, earlier + * submission is lesser + */ +public class FifoComparator implements SchedulerComparator { + + @Override + public int compare(SchedulerProcess r1, SchedulerProcess r2) { + int res = (int) Math.signum(r1.getSerialEpoch() - r2.getSerialEpoch()); + if (res == 0) { + res = (int) Math.signum(r1.getSerial() - r2.getSerial()); + } + return res; + } + + @Override + public boolean isReorderingEvent( + SchedulerProcess.SchedulerProcessEvent event) { + return false; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java new file mode 100644 index 0000000..f6e8af6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * An OrderingPolicy is used by the scheduler to order SchedulerProcesses for + * container assignment and preemption + */ +public interface OrderingPolicy { + /* + * Note: Some implementations of this interface depend upon external + * synchronization of all use of the SchedulerProcess Collection and Iterators + * for correctness and to avoid concurrent modification issues, therefore, + * all use of SchedulerProcesses should be externally synchronized + */ + + /** + * Get the collection of SchedulerProcesses which are managed by this + * OrderingPolicy - should include processes returned by the Assignment and + * Preemption iterator with no guarantees regarding order + */ + public Collection getSchedulerProcesses(); + + /** + * Return an iterator over the collection of SchedulerProcesses which orders + * them for container assignment as appropriate for this policy + */ + public Iterator getAssignmentIterator(); + + /** + * Return an iterator over the collection of SchedulerProcesses which orders + * them for preemption as appropriate for this policy + */ + public Iterator getPreemptionIterator(); + + /** + * Handle a schedulerProcessEvent for the given schedulerProcess, which + * must be contained in the SchedulerProcesses collection. Used for + * re-ordering a SchedulerProcess if it has changed such that its + * policy based position for container assignment or preemption may have + * changed + */ + public void handleSchedulerProcessEvent(S schedulerProcess, + SchedulerProcess.SchedulerProcessEvent event); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java new file mode 100644 index 0000000..d6b69ca --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * An extension of Comparator with additions necessary for use by the + * SchedulerComparatorPolicy + */ +public interface SchedulerComparator + extends Comparator { + + /** + * Can the passed event effect the relative comparison value of + * a SchedulerProcess + * @param event The event which has occurred + * @return true if so, otherwise false + * + */ + public boolean isReorderingEvent( + SchedulerProcess.SchedulerProcessEvent event); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java new file mode 100644 index 0000000..16cf9e0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerComparatorPolicy.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + + +/** + * An implementation of the OrderingPolicy whose behavior is delegated to a + * SchedulerComparator definition. This implementation can be used for cases + * where ordering logic can be expressed effectively in terms of a Comparator + */ +public class SchedulerComparatorPolicy + implements OrderingPolicy { + + protected TreeSet schedulerProcesses; + SchedulerComparator comparator; + + public SchedulerComparatorPolicy(SchedulerComparator + comparator) { + this.comparator = comparator; + schedulerProcesses = new TreeSet(comparator); + } + + @Override + public Collection getSchedulerProcesses() { + return schedulerProcesses; + } + + @Override + public Iterator getAssignmentIterator() { + return schedulerProcesses.iterator(); + } + + @Override + public Iterator getPreemptionIterator() { + return schedulerProcesses.descendingIterator(); + } + + public void handleSchedulerProcessEvent(S schedulerProcess, + SchedulerProcess.SchedulerProcessEvent event) { + if (comparator.isReorderingEvent(event)) { + //remove and reinsert to update position in order + schedulerProcesses.remove(schedulerProcess); + schedulerProcesses.add(schedulerProcess); + } + } + + public void setSchedulerComparator(SchedulerComparator comparator) { + this.comparator = comparator; + TreeSet schedulerProcesses = new TreeSet(comparator); + schedulerProcesses.addAll(this.schedulerProcesses); + this.schedulerProcesses = schedulerProcesses; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java new file mode 100644 index 0000000..e59f104 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulerProcess.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; + + +/** + * A scheduler process is a process to be scheduled, i.e. + * an application / application attempt + */ +public interface SchedulerProcess { + + /** + * SchedulerProcess Events of potential interest OrderingPolicies + */ + public enum SchedulerProcessEvent { + CONTAINER_ASSIGNED, + CONTAINER_COMPLETED, + CONTAINER_PREEMPTED + } + + /** + * The Epoch of the serial, i.e. Cluster Start Time when process was + * submitted + */ + public long getSerialEpoch(); + + /** + * Indicates order of submission within the Epoch + */ + public long getSerial(); + + /** + * Resources currently consumed by the process + */ + public Resource getCurrentConsumption(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 696b9bb..0a6876f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -44,6 +44,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -78,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -1032,6 +1034,11 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { .thenReturn(appAttemptIdList); } when(lq.getApplications()).thenReturn(qApps); + Iterator pi = qApps.descendingIterator(); + SchedulerComparatorPolicy ap = mock(SchedulerComparatorPolicy.class); + when(ap.getPreemptionIterator()).thenReturn(pi); + when(ap.getSchedulerProcesses()).thenReturn(qApps); + when(lq.getOrderingPolicy()).thenReturn(ap); if(setAMResourcePercent != 0.0f){ when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 8cad057..7a3ae90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -150,11 +150,13 @@ private FiCaSchedulerApp getMockApplication(int appId, String user, FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); + long appIdL = (long) appId; doReturn(applicationAttemptId.getApplicationId()). when(application).getApplicationId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); doReturn(amResource).when(application).getAMResource(); + doReturn(appIdL).when(application).getSerial(); return application; } @@ -469,7 +471,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_0)); + assertTrue(queue.getApplications().contains(app_0)); // Submit second application FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, @@ -479,7 +481,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_1)); + assertTrue(queue.getApplications().contains(app_1)); // Submit third application, should remain pending FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, @@ -508,7 +510,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); assertFalse(queue.pendingApplications.contains(app_2)); - assertFalse(queue.activeApplications.contains(app_2)); + assertFalse(queue.getApplications().contains(app_2)); // Finish 1st application, app_3 should become active queue.finishApplicationAttempt(app_0, A); @@ -516,9 +518,9 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_3)); + assertTrue(queue.getApplications().contains(app_3)); assertFalse(queue.pendingApplications.contains(app_3)); - assertFalse(queue.activeApplications.contains(app_0)); + assertFalse(queue.getApplications().contains(app_0)); // Finish 2nd application queue.finishApplicationAttempt(app_1, A); @@ -526,7 +528,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_1)); + assertFalse(queue.getApplications().contains(app_1)); // Finish 4th application queue.finishApplicationAttempt(app_3, A); @@ -534,7 +536,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(0, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_3)); + assertFalse(queue.getApplications().contains(app_3)); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index a5a2e5f..3230d26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -2011,7 +2012,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { e.submitApplicationAttempt(app_2, user_e); // same user // before reinitialization - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); csConf.setDouble(CapacitySchedulerConfiguration @@ -2028,7 +2029,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { root.reinitialize(newRoot, csContext.getClusterResource()); // after reinitialization - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2092,7 +2093,7 @@ public void testActivateApplicationByUpdatingClusterResource() e.submitApplicationAttempt(app_2, user_e); // same user // before updating cluster resource - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); @@ -2100,7 +2101,7 @@ public void testActivateApplicationByUpdatingClusterResource() new ResourceLimits(clusterResource)); // after updating cluster resource - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2451,6 +2452,84 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() + "forget to set off-switch request should be handled"); } } + + @Test + public void testFifoAssignment() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + a.setOrderingPolicy( + new SchedulerComparatorPolicy(new FifoComparator())); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource( + numNodes * (16*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, false, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + a.assignContainers(clusterResource, node_0_0, false, new ResourceLimits(clusterResource)); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + //Even thought it already has more resources, app_0 will still get + //assigned first + a.assignContainers(clusterResource, node_0_0, false, new ResourceLimits(clusterResource)); + Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + //and only then will app_1 + a.assignContainers(clusterResource, node_0_0, false, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + + } @Test public void testConcurrentAccess() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java new file mode 100644 index 0000000..25f6576 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulerProcess.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + + +/** + */ +public class MockSchedulerProcess implements SchedulerProcess { + + private Priority priority; + private long serialEpoch; + private long serial; + private String name; + private Resource currentConsumption; + + public MockSchedulerProcess() { + setPriority(Priority.newInstance(1)); + setSerialEpoch(0); + setSerial(0); + setName("a"); + setCurrentConsumption(Resources.createResource(0, 0)); + } + + public void setPriority(Priority priority) { + this.priority = priority; + } + + public Priority getPriority() { + return priority; + } + + public void setSerialEpoch(long serialEpoch) { + this.serialEpoch = serialEpoch; + } + + public long getSerialEpoch() { + return serialEpoch; + } + + public void setSerial(long serial) { + this.serial = serial; + } + + public long getSerial() { + return serial; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setCurrentConsumption(Resource currentConsumption) { + this.currentConsumption = currentConsumption; + } + + public Resource getCurrentConsumption() { + return currentConsumption; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java new file mode 100644 index 0000000..120a662 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoComparator.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + + +/** + */ +public class TestFifoComparator { + + @Test + public void testFifoComparator() { + Comparator comp = new FifoComparator(); + MockSchedulerProcess r1 = new MockSchedulerProcess(); + MockSchedulerProcess r2 = new MockSchedulerProcess(); + + Assert.assertEquals(comp.compare(r1, r2), 0); + + //serial + r1.setSerial(1); + Assert.assertEquals(comp.compare(r1, r2), 1); + + //serial epoch + r2.setSerialEpoch(1); + Assert.assertEquals(comp.compare(r1, r2), -1); + } + +}