diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java new file mode 100644 index 0000000..15279c0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + + +/** + * OrderingPolicy is used by the scheduler to order SchedulableEntities for + * container assignment and preemption + */ +public abstract class AbstractComparatorOrderingPolicy implements OrderingPolicy { + /* + * Note: OrderingPolicy depends upon external + * synchronization of all use of the SchedulableEntity Collection and + * Iterators for correctness and to avoid concurrent modification issues + */ + + private static final Log LOG = LogFactory.getLog(OrderingPolicy.class); + + protected static class CompoundComparator implements Comparator { + + List> comparators; + + CompoundComparator(List> comparators) { + this.comparators = comparators; + } + + @Override + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + for (Comparator comparator : comparators) { + int result = comparator.compare(r1, r2); + if (result != 0) return result; + } + return 0; + } + } + + protected TreeSet schedulableEntities; + protected Comparator comparator; + + public AbstractComparatorOrderingPolicy() { } + + /** + * Get the collection of SchedulableEntities which are managed by this + * OrderingPolicy - should include processes returned by the Assignment and + * Preemption iterator with no guarantees regarding order + */ + public Collection getSchedulableEntities() { + return schedulableEntities; + } + + /** + * Return an iterator over the collection of SchedulableEntities which orders + * them for container assignment + */ + public Iterator getAssignmentIterator() { + return schedulableEntities.iterator(); + } + + /** + * Return an iterator over the collection of SchedulableEntities which orders + * them for preemption + */ + public Iterator getPreemptionIterator() { + return schedulableEntities.descendingIterator(); + } + + protected void reorderSchedulableEntity(S schedulableEntity) { + //remove, update comparable data, and reinsert to update position in order + schedulableEntities.remove(schedulableEntity); + schedulableEntity.updateSchedulingState(); + schedulableEntities.add(schedulableEntity); + } + + public void setComparator(Comparator comparator) { + this.comparator = comparator; + TreeSet schedulableEntities = new TreeSet(comparator); + if (this.schedulableEntities != null) { + schedulableEntities.addAll(this.schedulableEntities); + } + this.schedulableEntities = schedulableEntities; + } + + public Comparator getComparator() { + return comparator; + } + + /** + * Add a SchedulableEntity to be managed for allocation and preemption + * ordering + */ + public void addSchedulableEntity(S s) { + schedulableEntities.add(s); + } + + /** + * Remove a SchedulableEntity from management for allocation and preemption + * ordering + */ + public boolean removeSchedulableEntity(S s) { + return schedulableEntities.remove(s); + } + + /** + * Add a collection of SchedulableEntities to be managed for allocation + * and preemption ordering + */ + public void addAllSchedulableEntities(Collection sc) { + schedulableEntities.addAll(sc); + } + + /** + * Get the number of SchedulableEntities managed for allocation and + * preemption ordering + */ + public int getNumSchedulableEntities() { + return schedulableEntities.size(); + } + + public abstract void configure(String conf); + + /** + * The passed SchedulableEntity has been allocated the passed Container, + * take appropriate action (depending on comparator, a reordering of the + * SchedulableEntity may be required) + */ + public abstract void containerAllocated(S schedulableEntity, + RMContainer r); + + /** + * The passed SchedulableEntity has released the passed Container, + * take appropriate action (depending on comparator, a reordering of the + * SchedulableEntity may be required) + */ + public abstract void containerReleased(S schedulableEntity, + RMContainer r); + + /** + * Display comparator type & information regarding configuration & status + */ + public abstract String getInfo(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java new file mode 100644 index 0000000..dd86105 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * An OrderingPolicy which orders SchedulableEntities by input order + */ +public class FifoOrderingPolicy extends AbstractComparatorOrderingPolicy { + + static class FifoComparator + implements Comparator { + @Override + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + int res = r1.compareInputOrderTo(r2); + return res; + } + } + + public FifoOrderingPolicy() { + setComparator(new FifoComparator()); + } + + @Override + public void configure(String conf) { + + } + + @Override + public void containerAllocated(S schedulableEntity, + RMContainer r) { + } + + @Override + public void containerReleased(S schedulableEntity, + RMContainer r) { + } + + @Override + public String getInfo() { + return "FifoOrderingPolicy"; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java new file mode 100644 index 0000000..37c87a0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + + +/** + * OrderingPolicy is used by the scheduler to order SchedulableEntities for + * container assignment and preemption + */ +public interface OrderingPolicy { + /* + * Note: OrderingPolicy depends upon external + * synchronization of all use of the SchedulableEntity Collection and + * Iterators for correctness and to avoid concurrent modification issues + */ + + /** + * Get the collection of SchedulableEntities which are managed by this + * OrderingPolicy - should include processes returned by the Assignment and + * Preemption iterator with no guarantees regarding order + */ + public Collection getSchedulableEntities(); + + /** + * Return an iterator over the collection of SchedulableEntities which orders + * them for container assignment + */ + public Iterator getAssignmentIterator(); + + /** + * Return an iterator over the collection of SchedulableEntities which orders + * them for preemption + */ + public Iterator getPreemptionIterator(); + + public void setComparator(Comparator comparator); + + public Comparator getComparator(); + + /** + * Add a SchedulableEntity to be managed for allocation and preemption + * ordering + */ + public void addSchedulableEntity(S s); + + /** + * Remove a SchedulableEntity from management for allocation and preemption + * ordering + */ + public boolean removeSchedulableEntity(S s); + + /** + * Add a collection of SchedulableEntities to be managed for allocation + * and preemption ordering + */ + public void addAllSchedulableEntities(Collection sc); + + /** + * Get the number of SchedulableEntities managed for allocation and + * preemption ordering + */ + public int getNumSchedulableEntities(); + + public void configure(String conf); + + /** + * The passed SchedulableEntity has been allocated the passed Container, + * take appropriate action (depending on comparator, a reordering of the + * SchedulableEntity may be required) + */ + public void containerAllocated(S schedulableEntity, + RMContainer r); + + /** + * The passed SchedulableEntity has released the passed Container, + * take appropriate action (depending on comparator, a reordering of the + * SchedulableEntity may be required) + */ + public void containerReleased(S schedulableEntity, + RMContainer r); + + /** + * Display comparator type & information regarding configuration & status + */ + public String getInfo(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java new file mode 100644 index 0000000..1761f0f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; + + +/** + * A SchedulableEntity is a process to be scheduled, + * for example, an application / application attempt + */ +public interface SchedulableEntity { + + /** + * Compare the passed SchedulableEntity to this one for input order. + * Input order is implementation defined and should reflect the + * correct ordering for first-in first-out processing + */ + public int compareInputOrderTo(SchedulableEntity other); + + /** + * View of Resources consumed by the process who's mutation is managed for + * the scheduler through updateSchedulingState. + */ + public Resource getSchedulingConsumption(); + + /** + * View of Resources wanted by the process who's mutation is managed for + * the scheduler through updateSchedulingState. + * Is the total of current consumption and outstanding resource requests. + */ + public Resource getSchedulingDemand(); + + /** + * Use to manage SchedulingConsumption, SchedulingDemand, and any other + * mutable values to insure ordering consistency. Aforementioned properties + * should not change except when this method is called + */ + public void updateSchedulingState(); + +}