diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 91fd12c..1539a6e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -34,15 +34,18 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -326,13 +329,103 @@ public static String getDefaultReservationSystem(ResourceScheduler scheduler) { return null; } - protected abstract Plan initializePlan(String planQueueName) - throws YarnException; + protected Plan initializePlan(String planQueueName) throws YarnException { + String planQueuePath = getPlanQueuePath(planQueueName); + SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath); + adPolicy.init(planQueuePath, getReservationSchedulerConfiguration()); + // Calculate the max plan capacity + Resource minAllocation = getMinAllocation(); + Resource maxAllocation = getMaxAllocation(); + ResourceCalculator rescCalc = getResourceCalculator(); + Resource totCap = getPlanQueueCapacity(planQueueName); + Plan plan = + new InMemoryPlan(getRootQueueMetrics(), adPolicy, + getAgent(planQueuePath), totCap, planStepSize, rescCalc, + minAllocation, maxAllocation, planQueueName, + getReplanner(planQueuePath), getReservationSchedulerConfiguration() + .getMoveOnExpiry(planQueuePath)); + LOG.info("Intialized plan {0} based on reservable queue {1}", + plan.toString(), planQueueName); + return plan; + } + + protected Planner getReplanner(String planQueueName) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String plannerClassName = reservationConfig.getReplanner(planQueueName); + LOG.info("Using Replanner: " + plannerClassName + " for queue: " + + planQueueName); + try { + Class plannerClazz = conf.getClassByName(plannerClassName); + if (Planner.class.isAssignableFrom(plannerClazz)) { + Planner planner = + (Planner) ReflectionUtils.newInstance(plannerClazz, conf); + planner.init(planQueueName, reservationConfig); + return planner; + } else { + throw new YarnRuntimeException("Class: " + plannerClazz + + " not instance of " + Planner.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Planner: " + + plannerClassName + " for queue: " + planQueueName, e); + } + } + + protected ReservationAgent getAgent(String queueName) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String agentClassName = reservationConfig.getReservationAgent(queueName); + LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); + try { + Class agentClazz = conf.getClassByName(agentClassName); + if (ReservationAgent.class.isAssignableFrom(agentClazz)) { + return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); + } else { + throw new YarnRuntimeException("Class: " + agentClassName + + " not instance of " + ReservationAgent.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Agent: " + + agentClassName + " for queue: " + queueName, e); + } + } + + protected SharingPolicy getAdmissionPolicy(String queueName) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String admissionPolicyClassName = + reservationConfig.getReservationAdmissionPolicy(queueName); + LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName + + " for queue: " + queueName); + try { + Class admissionPolicyClazz = + conf.getClassByName(admissionPolicyClassName); + if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { + return (SharingPolicy) ReflectionUtils.newInstance( + admissionPolicyClazz, conf); + } else { + throw new YarnRuntimeException("Class: " + admissionPolicyClassName + + " not instance of " + SharingPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " + + admissionPolicyClassName + " for queue: " + queueName, e); + } + } + + protected abstract ReservationSchedulerConfiguration + getReservationSchedulerConfiguration(); + + protected abstract String getPlanQueuePath(String planQueueName); + + protected abstract Resource getPlanQueueCapacity(String planQueueName); - protected abstract Planner getReplanner(String planQueueName); + protected abstract Resource getMinAllocation(); - protected abstract ReservationAgent getAgent(String queueName); + protected abstract Resource getMaxAllocation(); - protected abstract SharingPolicy getAdmissionPolicy(String queueName); + protected abstract ResourceCalculator getResourceCalculator(); + protected abstract QueueMetrics getRootQueueMetrics(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java index 7552e8c..afba7ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -21,13 +21,11 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -52,7 +50,7 @@ @Unstable public class CapacityOverTimePolicy implements SharingPolicy { - private CapacitySchedulerConfiguration conf; + private ReservationSchedulerConfiguration conf; private long validWindow; private float maxInst; private float maxAvg; @@ -61,13 +59,9 @@ // configuration structure of the schedulers (e.g., SchedulerConfiguration) // it should be easy to remove this limitation @Override - public void init(String reservationQueuePath, Configuration conf) { - if (!(conf instanceof CapacitySchedulerConfiguration)) { - throw new IllegalArgumentException("Unexpected conf type: " - + conf.getClass().getSimpleName() + " only supported conf is: " - + CapacitySchedulerConfiguration.class.getSimpleName()); - } - this.conf = (CapacitySchedulerConfiguration) conf; + public void init(String reservationQueuePath, + ReservationSchedulerConfiguration conf) { + this.conf = conf; validWindow = this.conf.getReservationWindow(reservationQueuePath); maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java index 9bce0d9..be0a708 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java @@ -21,15 +21,14 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,95 +66,45 @@ public void reinitialize(Configuration conf, RMContext rmContext) } @Override - protected Plan initializePlan(String planQueueName) throws YarnException { - SharingPolicy adPolicy = getAdmissionPolicy(planQueueName); - String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath(); - adPolicy.init(planQueuePath, capScheduler.getConfiguration()); - CSQueue planQueue = capScheduler.getQueue(planQueueName); - // Calculate the max plan capacity - Resource minAllocation = capScheduler.getMinimumResourceCapability(); - ResourceCalculator rescCalc = capScheduler.getResourceCalculator(); - Resource totCap = - rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(), - planQueue.getAbsoluteCapacity(), minAllocation); - Plan plan = - new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy, - getAgent(planQueuePath), totCap, planStepSize, rescCalc, - minAllocation, capScheduler.getMaximumResourceCapability(), - planQueueName, getReplanner(planQueuePath), capScheduler - .getConfiguration().getMoveOnExpiry(planQueuePath)); - LOG.info("Intialized plan {0} based on reservable queue {1}", - plan.toString(), planQueueName); - return plan; + protected Resource getMinAllocation() { + return capScheduler.getMinimumResourceCapability(); } @Override - protected Planner getReplanner(String planQueueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String plannerClassName = capSchedulerConfig.getReplanner(planQueueName); - LOG.info("Using Replanner: " + plannerClassName + " for queue: " - + planQueueName); - try { - Class plannerClazz = - capSchedulerConfig.getClassByName(plannerClassName); - if (Planner.class.isAssignableFrom(plannerClazz)) { - Planner planner = - (Planner) ReflectionUtils.newInstance(plannerClazz, conf); - planner.init(planQueueName, capSchedulerConfig); - return planner; - } else { - throw new YarnRuntimeException("Class: " + plannerClazz - + " not instance of " + Planner.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Could not instantiate Planner: " - + plannerClassName + " for queue: " + planQueueName, e); - } + protected Resource getMaxAllocation() { + return capScheduler.getMaximumResourceCapability(); } @Override - protected ReservationAgent getAgent(String queueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String agentClassName = capSchedulerConfig.getReservationAgent(queueName); - LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); - try { - Class agentClazz = capSchedulerConfig.getClassByName(agentClassName); - if (ReservationAgent.class.isAssignableFrom(agentClazz)) { - return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + agentClassName - + " not instance of " + ReservationAgent.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Could not instantiate Agent: " - + agentClassName + " for queue: " + queueName, e); - } + protected ResourceCalculator getResourceCalculator() { + return capScheduler.getResourceCalculator(); } @Override - protected SharingPolicy getAdmissionPolicy(String queueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String admissionPolicyClassName = - capSchedulerConfig.getReservationAdmissionPolicy(queueName); - LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName - + " for queue: " + queueName); - try { - Class admissionPolicyClazz = - capSchedulerConfig.getClassByName(admissionPolicyClassName); - if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { - return (SharingPolicy) ReflectionUtils.newInstance( - admissionPolicyClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + admissionPolicyClassName - + " not instance of " + SharingPolicy.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " - + admissionPolicyClassName + " for queue: " + queueName, e); - } + protected QueueMetrics getRootQueueMetrics() { + return capScheduler.getRootQueueMetrics(); + } + + @Override + protected String getPlanQueuePath(String planQueueName) { + return capScheduler.getQueue(planQueueName).getQueuePath(); + } + + @Override + protected Resource getPlanQueueCapacity(String planQueueName) { + Resource minAllocation = getMinAllocation(); + ResourceCalculator rescCalc = getResourceCalculator(); + CSQueue planQueue = capScheduler.getQueue(planQueueName); + return rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(), + planQueue.getAbsoluteCapacity(), minAllocation); + } + + @Override + protected ReservationSchedulerConfiguration + getReservationSchedulerConfiguration() { + return capScheduler.getConfiguration(); } } + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java index 23f2be4..f87e9dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -85,7 +84,8 @@ public long getValidWindow() { } @Override - public void init(String planQueuePath, Configuration conf) { + public void init(String planQueuePath, + ReservationSchedulerConfiguration conf) { // nothing to do for this policy } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java index d2b6184..57f28ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java @@ -20,7 +20,6 @@ import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -44,5 +43,5 @@ public void plan(Plan plan, List contracts) * @param planQueueName the name of the queue for this plan * @param conf the scheduler configuration */ - void init(String planQueueName, Configuration conf); + void init(String planQueueName, ReservationSchedulerConfiguration conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java new file mode 100644 index 0000000..2af1ffd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; + +public abstract class ReservationSchedulerConfiguration extends Configuration { + + @InterfaceAudience.Private + public static final long DEFAULT_RESERVATION_WINDOW = 24*60*60*1000; // 1 day in msec + + @InterfaceAudience.Private + public static final String DEFAULT_RESERVATION_ADMISSION_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy"; + + @InterfaceAudience.Private + public static final String DEFAULT_RESERVATION_AGENT_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; + + @InterfaceAudience.Private + public static final String DEFAULT_RESERVATION_PLANNER_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; + + @InterfaceAudience.Private + public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; + + // default to 1h lookahead enforcement + @InterfaceAudience.Private + public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 60*60*1000; + // 1 hour + + @InterfaceAudience.Private + public static final boolean DEFAULT_SHOW_RESERVATIONS_AS_QUEUES = false; + + @InterfaceAudience.Private + public static final float DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER = 1; + + public ReservationSchedulerConfiguration() { super(); } + + public ReservationSchedulerConfiguration( + Configuration configuration) { + super(configuration); + } + + /** + * Checks if the queue participates in reservation based scheduling + * @param queue + * @return true if the queue participates in reservation based scheduling + */ + public abstract boolean isReservable(String queue); + + /** + * Gets the length of time in milliseconds for which the {@link SharingPolicy} + * checks for validity + * @param queue name of the queue + * @return length in time in milliseconds for which to check the + * {@link SharingPolicy} + */ + public long getReservationWindow(String queue) { + return DEFAULT_RESERVATION_WINDOW; + } + + /** + * Gets the average allowed capacity which will aggregated over the + * {@link ReservationSchedulerConfiguration#getReservationWindow} by the + * the {@link SharingPolicy} to check aggregate used capacity + * @param queue name of the queue + * @return average capacity allowed by the {@link SharingPolicy} + */ + public float getAverageCapacity(String queue) { + return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; + } + + /** + * Gets the maximum capacity at any time that the {@link SharingPolicy} allows + * @param queue name of the queue + * @return maximum allowed capacity at any time + */ + public float getInstantaneousMaxCapacity(String queue) { + return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; + } + + /** + * Gets the name of the {@link SharingPolicy} class associated with the queue + * @param queue name of the queue + * @return the class name of the {@link SharingPolicy} + */ + public String getReservationAdmissionPolicy(String queue) { + return DEFAULT_RESERVATION_ADMISSION_POLICY; + } + + /** + * Gets the name of the {@link ReservationAgent} class associated with the + * queue + * @param queue name of the queue + * @return the class name of the {@link ReservationAgent} + */ + public String getReservationAgent(String queue) { + return DEFAULT_RESERVATION_AGENT_NAME; + } + + /** + * Checks whether the reservation queues be hidden or visible + * @param queuePath name of the queue + * @return true if reservation queues should be visible + */ + public boolean getShowReservationAsQueues(String queuePath) { + return DEFAULT_SHOW_RESERVATIONS_AS_QUEUES; + } + + /** + * Gets the name of the {@link Planner} class associated with the + * queue + * @param queue name of the queue + * @return the class name of the {@link Planner} + */ + public String getReplanner(String queue) { + return DEFAULT_RESERVATION_PLANNER_NAME; + } + + /** + * Gets whether the applications should be killed or moved to the parent queue + * when the {@link ReservationDefinition} expires + * @param queue name of the queue + * @return true if application should be moved, false if they need to be + * killed + */ + public boolean getMoveOnExpiry(String queue) { + return DEFAULT_RESERVATION_MOVE_ON_EXPIRY; + } + + /** + * Gets the time in milliseconds for which the {@link Planner} will verify + * the {@link Plan}s satisfy the constraints + * @param queue name of the queue + * @return the time in milliseconds for which to check constraints + */ + public long getEnforcementWindow(String queue) { + return DEFAULT_RESERVATION_ENFORCEMENT_WINDOW; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java index 3e5452e..8f8d24c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; /** @@ -38,7 +37,7 @@ * @param planQueuePath the name of the queue for this plan * @param conf the system configuration */ - public void init(String planQueuePath, Configuration conf); + public void init(String planQueuePath, ReservationSchedulerConfiguration conf); /** * This method runs the policy validation logic, and return true/false on diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java index e38dd3cd..b5a6a99 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java @@ -25,11 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -69,15 +67,9 @@ public SimpleCapacityReplanner() { } @Override - public void init(String planQueueName, Configuration conf) { - if (!(conf instanceof CapacitySchedulerConfiguration)) { - throw new IllegalArgumentException("Unexpected conf type: " - + conf.getClass().getSimpleName() + " only supported conf is: " - + CapacitySchedulerConfiguration.class.getSimpleName()); - } - this.lengthOfCheckZone = - ((CapacitySchedulerConfiguration) conf) - .getEnforcementWindow(planQueueName); + public void init(String planQueueName, + ReservationSchedulerConfiguration conf) { + this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b36172c..69a51fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -41,13 +41,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; -public class CapacitySchedulerConfiguration extends Configuration { +public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { private static final Log LOG = LogFactory.getLog(CapacitySchedulerConfiguration.class); @@ -222,9 +223,6 @@ public QueueMapping(MappingType type, String source, String queue) { "instantaneous-max-capacity"; @Private - public static final long DEFAULT_RESERVATION_WINDOW = 86400000L; - - @Private public static final String RESERVATION_ADMISSION_POLICY = "reservation-policy"; @@ -236,35 +234,16 @@ public QueueMapping(MappingType type, String source, String queue) { "show-reservations-as-queues"; @Private - public static final String DEFAULT_RESERVATION_ADMISSION_POLICY = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy"; - - @Private - public static final String DEFAULT_RESERVATION_AGENT_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; - - @Private public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; @Private - public static final String DEFAULT_RESERVATION_PLANNER_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; - - @Private public static final String RESERVATION_MOVE_ON_EXPIRY = "reservation-move-on-expiry"; @Private - public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; - - @Private public static final String RESERVATION_ENFORCEMENT_WINDOW = "reservation-enforcement-window"; - // default to 1h lookahead enforcement - @Private - public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000; - public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -734,6 +713,7 @@ public void setReservable(String queue, boolean isReservable) { + ", isReservableQueue=" + isReservable(queue)); } + @Override public long getReservationWindow(String queue) { long reservationWindow = getLong(getQueuePrefix(queue) + RESERVATION_WINDOW, @@ -741,6 +721,7 @@ public long getReservationWindow(String queue) { return reservationWindow; } + @Override public float getAverageCapacity(String queue) { float avgCapacity = getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, @@ -748,6 +729,7 @@ public float getAverageCapacity(String queue) { return avgCapacity; } + @Override public float getInstantaneousMaxCapacity(String queue) { float instMaxCapacity = getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, @@ -768,6 +750,7 @@ public void setAverageCapacity(String queue, float avgCapacity) { setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity); } + @Override public String getReservationAdmissionPolicy(String queue) { String reservationPolicy = get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, @@ -780,6 +763,7 @@ public void setReservationAdmissionPolicy(String queue, set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy); } + @Override public String getReservationAgent(String queue) { String reservationAgent = get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, @@ -791,13 +775,16 @@ public void setReservationAgent(String queue, String reservationPolicy) { set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy); } + @Override public boolean getShowReservationAsQueues(String queuePath) { boolean showReservationAsQueues = getBoolean(getQueuePrefix(queuePath) - + RESERVATION_SHOW_RESERVATION_AS_QUEUE, false); + + RESERVATION_SHOW_RESERVATION_AS_QUEUE, + DEFAULT_SHOW_RESERVATIONS_AS_QUEUES); return showReservationAsQueues; } + @Override public String getReplanner(String queue) { String replanner = get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME, @@ -805,6 +792,7 @@ public String getReplanner(String queue) { return replanner; } + @Override public boolean getMoveOnExpiry(String queue) { boolean killOnExpiry = getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY, @@ -812,6 +800,7 @@ public boolean getMoveOnExpiry(String queue) { return killOnExpiry; } + @Override public long getEnforcementWindow(String queue) { long enforcementWindow = getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 8b662cc..b5e4df2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; @@ -61,6 +63,44 @@ public static ReservationId getNewReservationId() { return ReservationId.newInstance(rand.nextLong(), rand.nextLong()); } + public static ReservationSchedulerConfiguration createConf( + String reservationQ, long timeWindow, float instConstraint, + float avgConstraint) { + ReservationSchedulerConfiguration conf = mock + (ReservationSchedulerConfiguration.class); + when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow); + when(conf.getInstantaneousMaxCapacity(reservationQ)).thenReturn + (instConstraint); + when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint); + return conf; + } + + public static void validateReservationQueue( + AbstractReservationSystem reservationSystem, String planQName) { + Plan plan = reservationSystem.getPlan(planQName); + Assert.assertNotNull(plan); + Assert.assertTrue(plan instanceof InMemoryPlan); + Assert.assertEquals(planQName, plan.getQueueName()); + Assert.assertEquals(8192, plan.getTotalCapacity().getMemory()); + Assert.assertTrue( + plan.getReservationAgent() instanceof GreedyReservationAgent); + Assert.assertTrue( + plan.getSharingPolicy() instanceof CapacityOverTimePolicy); + } + + public static void validateNewReservationQueue( + AbstractReservationSystem reservationSystem, String newQ) { + Plan newPlan = reservationSystem.getPlan(newQ); + Assert.assertNotNull(newPlan); + Assert.assertTrue(newPlan instanceof InMemoryPlan); + Assert.assertEquals(newQ, newPlan.getQueueName()); + Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory()); + Assert + .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent); + Assert + .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); + } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { @@ -70,12 +110,35 @@ public CapacityScheduler mockCapacityScheduler(int numContainers) CapacityScheduler cs = Mockito.spy(new CapacityScheduler()); cs.setConf(new YarnConfiguration()); + + RMContext mockRmContext = createRMContext(conf); + + cs.setRMContext(mockRmContext); + try { + cs.serviceInit(conf); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + initializeRMContext(numContainers, cs, mockRmContext); + return cs; + } + + public static void initializeRMContext(int numContainers, + AbstractYarnScheduler scheduler, RMContext mockRMContext) { + + when(mockRMContext.getScheduler()).thenReturn(scheduler); + Resource r = calculateClusterResource(numContainers); + doReturn(r).when(scheduler).getClusterResource(); + } + + public static RMContext createRMContext(Configuration conf) { RMContext mockRmContext = Mockito.spy(new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), null)); - + RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); when( nlm.getQueueResource(any(String.class), any(Set.class), @@ -86,7 +149,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { return (Resource) args[2]; } }); - + when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) .thenAnswer(new Answer() { @Override @@ -95,19 +158,9 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { return (Resource) args[1]; } }); - + mockRmContext.setNodeLabelManager(nlm); - - cs.setRMContext(mockRmContext); - try { - cs.serviceInit(conf); - } catch (Exception e) { - Assert.fail(e.getMessage()); - } - when(mockRmContext.getScheduler()).thenReturn(cs); - Resource r = Resource.newInstance(numContainers * 1024, numContainers); - doReturn(r).when(cs).getClusterResource(); - return cs; + return mockRmContext; } public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { @@ -174,7 +227,7 @@ public void updateQueueConfiguration(CapacitySchedulerConfiguration conf, // Define 2nd-level queues final String A1 = A + ".a1"; final String A2 = A + ".a2"; - conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setQueues(A, new String[]{"a1", "a2"}); conf.setCapacity(A1, 30); conf.setCapacity(A2, 70); } @@ -254,4 +307,9 @@ public static ReservationDefinition generateBigRR(Random rand, long i) { return req; } + public static Resource calculateClusterResource(int numContainers) { + Resource clusterResource = Resource.newInstance(numContainers * 1024, + numContainers); + return clusterResource; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 451a155..61561e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Map; @@ -29,8 +30,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; @@ -74,18 +75,18 @@ public void setup() throws Exception { mAgent = mock(ReservationAgent.class); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); + QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint); - capConf.setAverageCapacity(reservationQ, avgConstraint); + Resource clusterResource = testUtil.calculateClusterResource(totCont); + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = - new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, - scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + new InMemoryPlan(rootQueueMetrics, policy, mAgent, + clusterResource, step, res, minAlloc, maxAlloc, "dedicated", null, true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java index dd68277..11e52c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java @@ -47,15 +47,8 @@ public void testInitialize() { Assert.fail(e.getMessage()); } String planQName = testUtil.getreservationQueueName(); - Plan plan = reservationSystem.getPlan(planQName); - Assert.assertNotNull(plan); - Assert.assertTrue(plan instanceof InMemoryPlan); - Assert.assertEquals(planQName, plan.getQueueName()); - Assert.assertEquals(8192, plan.getTotalCapacity().getMemory()); - Assert - .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent); - Assert - .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy); + ReservationSystemTestUtil.validateReservationQueue(reservationSystem, + planQName); } @Test @@ -80,15 +73,7 @@ public void testReinitialize() { } // Assert queue in original config String planQName = testUtil.getreservationQueueName(); - Plan plan = reservationSystem.getPlan(planQName); - Assert.assertNotNull(plan); - Assert.assertTrue(plan instanceof InMemoryPlan); - Assert.assertEquals(planQName, plan.getQueueName()); - Assert.assertEquals(8192, plan.getTotalCapacity().getMemory()); - Assert - .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent); - Assert - .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy); + ReservationSystemTestUtil.validateReservationQueue(reservationSystem, planQName); // Dynamically add a plan String newQ = "reservation"; @@ -104,16 +89,6 @@ public void testReinitialize() { } catch (YarnException e) { Assert.fail(e.getMessage()); } - Plan newPlan = reservationSystem.getPlan(newQ); - Assert.assertNotNull(newPlan); - Assert.assertTrue(newPlan instanceof InMemoryPlan); - Assert.assertEquals(newQ, newPlan.getQueueName()); - Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory()); - Assert - .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent); - Assert - .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); - + ReservationSystemTestUtil.validateNewReservationQueue(reservationSystem, newQ); } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java index f1e1e5c..b8cf6c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java @@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -71,18 +69,19 @@ public void setup() throws Exception { Resource clusterCapacity = Resource.newInstance(100 * 1024, 100); step = 1000L; ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setMaximumCapacity(reservationQ, 100); - capConf.setAverageCapacity(reservationQ, 100); + + float instConstraint = 100; + float avgConstraint = 100; + + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); agent = new GreedyReservationAgent(); - QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated", - mock(ParentQueue.class), false, capConf); + QueueMetrics queueMetrics = mock(QueueMetrics.class); plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); @@ -549,12 +548,13 @@ public void testStress(int numJobs) throws PlanningException, IOException { ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setMaximumCapacity(reservationQ, 100); - capConf.setAverageCapacity(reservationQ, 100); + float instConstraint = 100; + float avgConstraint = 100; + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent, clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java index 9389f12..1e15618 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -27,8 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Before; @@ -60,15 +59,17 @@ public void setup() throws Exception { mAgent = mock(ReservationAgent.class); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); + Resource clusterResource = testUtil.calculateClusterResource(totCont); + ReservationSchedulerConfiguration conf = mock + (ReservationSchedulerConfiguration.class); NoOverCommitPolicy policy = new NoOverCommitPolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = - new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, - scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + new InMemoryPlan(rootQueueMetrics, policy, mAgent, + clusterResource, step, res, minAlloc, maxAlloc, "dedicated", null, true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java index c94ef69..1ca9f2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -60,13 +59,10 @@ public void testReplanningPlanCapacityLoss() throws PlanningException { when(clock.getTime()).thenReturn(0L); SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock); - CapacitySchedulerConfiguration conf = - mock(CapacitySchedulerConfiguration.class); + ReservationSchedulerConfiguration conf = + mock(ReservationSchedulerConfiguration.class); when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L); - conf.setLong(CapacitySchedulerConfiguration.PREFIX + "blah" - + CapacitySchedulerConfiguration.DOT - + CapacitySchedulerConfiguration.RESERVATION_ENFORCEMENT_WINDOW, 6); enf.init("blah", conf); // Initialize the plan with more resources