diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 36ee4da..e3db816 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -302,6 +302,22 @@ private void _dec(String label, ResourceType type, Resource res) { } } + public Resource getDemand() { + return getDemand(NL); + } + + public Resource getDemand(String label) { + try { + readLock.lock(); + Resource demand = Resources.createResource(0); + Resources.addTo(demand, getUsed(label)); + Resources.addTo(demand, getPending(label)); + return demand; + } finally { + readLock.unlock(); + } + } + @Override public String toString() { try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 5e0bbc7..79edf33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; + import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; @@ -69,7 +71,7 @@ */ @Private @Unstable -public class SchedulerApplicationAttempt { +public class SchedulerApplicationAttempt implements SchedulableEntity { private static final Log LOG = LogFactory .getLog(SchedulerApplicationAttempt.class); @@ -669,4 +671,51 @@ public void recordContainerAllocationTime(long value) { public Set getBlacklistedNodes() { return this.appSchedulingInfo.getBlackListCopy(); } + + private Resource getDemand() { + Resource demand = Resources.createResource(0); + // Demand is current consumption plus outstanding requests + Resources.addTo(demand, getCurrentConsumption()); + + // Add up outstanding resource requests + synchronized (this) { + for (Priority p : getPriorities()) { + for (ResourceRequest r : getResourceRequests(p).values()) { + Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); + Resources.addTo(demand, total); + } + } + } + return demand; + } + + @Override + public String getId() { + return getApplicationId().toString(); + } + + @Override + public int compareInputOrderTo(SchedulableEntity other) { + if (other instanceof SchedulerApplicationAttempt) { + return getApplicationId().compareTo( + ((SchedulerApplicationAttempt)other).getApplicationId()); + } + return 1;//let other types go before this, if any + } + + private ResourceUsage schedulingResourceUsage = new ResourceUsage(); + + @Override + public synchronized ResourceUsage getSchedulingResourceUsage() { + return schedulingResourceUsage; + } + + @Override + public synchronized void updateSchedulingResourceUsage() { + ResourceUsage updUsage = new ResourceUsage(); + updUsage.copyAllUsed(attemptResourceUsage); + updUsage.setPending(getDemand()); + schedulingResourceUsage = updUsage; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java new file mode 100644 index 0000000..404306e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + + +/** + * An OrderingPolicy which can serve as a baseclass for policies which can be + * expressed in terms of comparators + */ +public abstract class AbstractComparatorOrderingPolicy implements OrderingPolicy { + + private static final Log LOG = LogFactory.getLog(OrderingPolicy.class); + + //Some policies will use multiple comparators joined together + protected static class CompoundComparator implements Comparator { + + List> comparators; + + CompoundComparator(List> comparators) { + this.comparators = comparators; + } + + @Override + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + for (Comparator comparator : comparators) { + int result = comparator.compare(r1, r2); + if (result != 0) return result; + } + return 0; + } + } + + protected TreeSet schedulableEntities; + protected Comparator comparator; + + public AbstractComparatorOrderingPolicy() { } + + @Override + public Collection getSchedulableEntities() { + return schedulableEntities; + } + + @Override + public Iterator getAssignmentIterator() { + return schedulableEntities.iterator(); + } + + @Override + public Iterator getPreemptionIterator() { + return schedulableEntities.descendingIterator(); + } + + protected void reorderSchedulableEntity(S schedulableEntity) { + //remove, update comparable data, and reinsert to update position in order + schedulableEntities.remove(schedulableEntity); + schedulableEntity.updateSchedulingResourceUsage(); + schedulableEntities.add(schedulableEntity); + } + + public void setComparator(Comparator comparator) { + this.comparator = comparator; + TreeSet schedulableEntities = new TreeSet(comparator); + if (this.schedulableEntities != null) { + schedulableEntities.addAll(this.schedulableEntities); + } + this.schedulableEntities = schedulableEntities; + } + + public Comparator getComparator() { + return comparator; + } + + @Override + public void addSchedulableEntity(S s) { + schedulableEntities.add(s); + } + + @Override + public boolean removeSchedulableEntity(S s) { + return schedulableEntities.remove(s); + } + + @Override + public void addAllSchedulableEntities(Collection sc) { + schedulableEntities.addAll(sc); + } + + @Override + public int getNumSchedulableEntities() { + return schedulableEntities.size(); + } + + @Override + public abstract void configure(String conf); + + @Override + public abstract void containerAllocated(S schedulableEntity, + RMContainer r); + + @Override + public abstract void containerReleased(S schedulableEntity, + RMContainer r); + + @Override + public abstract String getInfo(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java new file mode 100644 index 0000000..c53341a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * An OrderingPolicy which orders SchedulableEntities by input order + */ +public class FifoOrderingPolicy extends AbstractComparatorOrderingPolicy { + + protected static class FifoComparator + implements Comparator { + @Override + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + int res = r1.compareInputOrderTo(r2); + return res; + } + } + + public FifoOrderingPolicy() { + setComparator(new FifoComparator()); + } + + @Override + public void configure(String conf) { + + } + + @Override + public void containerAllocated(S schedulableEntity, + RMContainer r) { + } + + @Override + public void containerReleased(S schedulableEntity, + RMContainer r) { + } + + @Override + public String getInfo() { + return "FifoOrderingPolicy"; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java new file mode 100644 index 0000000..29ed58b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + + +/** + * OrderingPolicy is used by the scheduler to order SchedulableEntities for + * container assignment and preemption + */ +public interface OrderingPolicy { + /* + * Note: OrderingPolicy depends upon external + * synchronization of all use of the SchedulableEntity Collection and + * Iterators for correctness and to avoid concurrent modification issues + */ + + /** + * Get the collection of SchedulableEntities which are managed by this + * OrderingPolicy - should include processes returned by the Assignment and + * Preemption iterator with no guarantees regarding order + */ + public Collection getSchedulableEntities(); + + /** + * Return an iterator over the collection of SchedulableEntities which orders + * them for container assignment + */ + public Iterator getAssignmentIterator(); + + /** + * Return an iterator over the collection of SchedulableEntities which orders + * them for preemption + */ + public Iterator getPreemptionIterator(); + + /** + * Add a SchedulableEntity to be managed for allocation and preemption + * ordering + */ + public void addSchedulableEntity(S s); + + /** + * Remove a SchedulableEntity from management for allocation and preemption + * ordering + */ + public boolean removeSchedulableEntity(S s); + + /** + * Add a collection of SchedulableEntities to be managed for allocation + * and preemption ordering + */ + public void addAllSchedulableEntities(Collection sc); + + /** + * Get the number of SchedulableEntities managed for allocation and + * preemption ordering + */ + public int getNumSchedulableEntities(); + + /** + * Provides configuration information for the policy from the scheduler + * configuration + */ + public void configure(String conf); + + /** + * The passed SchedulableEntity has been allocated the passed Container, + * take appropriate action (depending on comparator, a reordering of the + * SchedulableEntity may be required) + */ + public void containerAllocated(S schedulableEntity, + RMContainer r); + + /** + * The passed SchedulableEntity has released the passed Container, + * take appropriate action (depending on comparator, a reordering of the + * SchedulableEntity may be required) + */ + public void containerReleased(S schedulableEntity, + RMContainer r); + + /** + * Display information regarding configuration & status + */ + public String getInfo(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java new file mode 100644 index 0000000..ea3e76a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; + + +/** + * A SchedulableEntity is a process to be scheduled, + * for example, an application / application attempt + */ +public interface SchedulableEntity { + + /** + * Id - each process must have a unique id + */ + public String getId(); + + /** + * Compare the passed SchedulableEntity to this one for input order. + * Input order is implementation defined and should reflect the + * correct ordering for first-in first-out processing + */ + public int compareInputOrderTo(SchedulableEntity other); + + /** + * Use to manage SchedulingConsumption, SchedulingDemand, and any other + * mutable values to insure ordering consistency. Aforementioned properties + * should not change except when this method is called + */ + public void updateSchedulingResourceUsage(); + + /** + * View of Resources wanted and consumed by the process who's + * mutation is managed for the scheduler through + * updateSchedulingResourceUsage. + */ + public ResourceUsage getSchedulingResourceUsage(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java new file mode 100644 index 0000000..2feab77 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; + + +public class MockSchedulableEntity implements SchedulableEntity { + + private String id; + private long serial = 0; + private Resource consumption = Resources.createResource(0); + private Resource demand = Resources.createResource(0); + private Resource schedulingConsumption = Resources.createResource(0); + private Resource schedulingDemand = Resources.createResource(0); + + public MockSchedulableEntity() { } + + public void setId(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public void setSerial(long serial) { + this.serial = serial; + } + + public long getSerial() { + return serial; + } + + public void setConsumption(Resource consumption) { + this.consumption = consumption; + } + + public void setDemand(Resource demand) { + this.demand = demand; + } + + private ResourceUsage schedulingResourceUsage = new ResourceUsage(); + + @Override + public ResourceUsage getSchedulingResourceUsage() { + return schedulingResourceUsage; + } + + @Override + public void updateSchedulingResourceUsage() { + ResourceUsage updUsage = new ResourceUsage(); + updUsage.setUsed(consumption); + updUsage.setPending(demand); + schedulingResourceUsage = updUsage; + } + + @Override + public int compareInputOrderTo(SchedulableEntity other) { + if (other instanceof MockSchedulableEntity) { + MockSchedulableEntity r2 = (MockSchedulableEntity) other; + int res = (int) Math.signum(getSerial() - r2.getSerial()); + return res; + } + return 1;//let other types go before this, if any + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java new file mode 100644 index 0000000..39a626e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestFifoOrderingPolicy { + + @Test + public void testFifoOrderingPolicy() { + FifoOrderingPolicy policy = new FifoOrderingPolicy(); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + Assert.assertEquals(policy.getComparator().compare(r1, r2), 0); + + r1.setSerial(1); + Assert.assertEquals(policy.getComparator().compare(r1, r2), 1); + + r2.setSerial(2); + Assert.assertEquals(policy.getComparator().compare(r1, r2), -1); + } + + @Test + public void testIterators() { + OrderingPolicy schedOrder = + new FifoOrderingPolicy(); + + MockSchedulableEntity msp1 = new MockSchedulableEntity(); + MockSchedulableEntity msp2 = new MockSchedulableEntity(); + MockSchedulableEntity msp3 = new MockSchedulableEntity(); + + msp1.setSerial(3); + msp2.setSerial(2); + msp3.setSerial(1); + + schedOrder.addSchedulableEntity(msp1); + schedOrder.addSchedulableEntity(msp2); + schedOrder.addSchedulableEntity(msp3); + + //Assignment, oldest to youngest + checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3}); + + //Preemption, youngest to oldest + checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1}); + } + + public void checkSerials(Iterator si, + long[] serials) { + for (int i = 0;i < serials.length;i++) { + Assert.assertEquals(si.next().getSerial(), + serials[i]); + } + } + +}