diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 91fd12c..19953f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -34,15 +34,18 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -326,13 +329,104 @@ public static String getDefaultReservationSystem(ResourceScheduler scheduler) { return null; } - protected abstract Plan initializePlan(String planQueueName) - throws YarnException; + protected Plan initializePlan(String planQueueName) throws YarnException { + String planQueuePath = getPlanQueuePath(planQueueName); + SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath); + adPolicy.init(planQueuePath, getReservationSchedulerConfiguration()); + // Calculate the max plan capacity + Resource minAllocation = getMinAllocation(); + Resource maxAllocation = getMaxAllocation(); + ResourceCalculator rescCalc = getResourceCalculator(); + Resource totCap = getPlanQueueCapacity(planQueueName); + Plan plan = + new InMemoryPlan(getRootQueueMetrics(), adPolicy, + getAgent(planQueuePath), totCap, planStepSize, rescCalc, + minAllocation, maxAllocation, planQueueName, + getReplanner(planQueuePath), getMoveOnExpiry (planQueuePath)); + LOG.info("Intialized plan {0} based on reservable queue {1}", + plan.toString(), planQueueName); + return plan; + } + + protected Planner getReplanner(String planQueueName) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String plannerClassName = reservationConfig.getReplanner(planQueueName); + LOG.info("Using Replanner: " + plannerClassName + " for queue: " + + planQueueName); + try { + Class plannerClazz = conf.getClassByName(plannerClassName); + if (Planner.class.isAssignableFrom(plannerClazz)) { + Planner planner = + (Planner) ReflectionUtils.newInstance(plannerClazz, conf); + planner.init(planQueueName, reservationConfig); + return planner; + } else { + throw new YarnRuntimeException("Class: " + plannerClazz + + " not instance of " + Planner.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Planner: " + + plannerClassName + " for queue: " + planQueueName, e); + } + } + + protected ReservationAgent getAgent(String queueName) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String agentClassName = reservationConfig.getReservationAgent(queueName); + LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); + try { + Class agentClazz = conf.getClassByName(agentClassName); + if (ReservationAgent.class.isAssignableFrom(agentClazz)) { + return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); + } else { + throw new YarnRuntimeException("Class: " + agentClassName + + " not instance of " + ReservationAgent.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Agent: " + + agentClassName + " for queue: " + queueName, e); + } + } + + protected SharingPolicy getAdmissionPolicy(String queueName) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String admissionPolicyClassName = + reservationConfig.getReservationAdmissionPolicy(queueName); + LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName + + " for queue: " + queueName); + try { + Class admissionPolicyClazz = + conf.getClassByName(admissionPolicyClassName); + if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { + return (SharingPolicy) ReflectionUtils.newInstance( + admissionPolicyClazz, conf); + } else { + throw new YarnRuntimeException("Class: " + admissionPolicyClassName + + " not instance of " + SharingPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " + + admissionPolicyClassName + " for queue: " + queueName, e); + } + } + + protected abstract ReservationSchedulerConfiguration + getReservationSchedulerConfiguration(); + + protected abstract String getPlanQueuePath(String planQueueName); + + protected abstract Resource getPlanQueueCapacity(String planQueueName); + + protected abstract boolean getMoveOnExpiry(String planQueueName); - protected abstract Planner getReplanner(String planQueueName); + protected abstract Resource getMinAllocation(); - protected abstract ReservationAgent getAgent(String queueName); + protected abstract Resource getMaxAllocation(); - protected abstract SharingPolicy getAdmissionPolicy(String queueName); + protected abstract ResourceCalculator getResourceCalculator(); + protected abstract QueueMetrics getRootQueueMetrics(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java index 7552e8c..afba7ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -21,13 +21,11 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -52,7 +50,7 @@ @Unstable public class CapacityOverTimePolicy implements SharingPolicy { - private CapacitySchedulerConfiguration conf; + private ReservationSchedulerConfiguration conf; private long validWindow; private float maxInst; private float maxAvg; @@ -61,13 +59,9 @@ // configuration structure of the schedulers (e.g., SchedulerConfiguration) // it should be easy to remove this limitation @Override - public void init(String reservationQueuePath, Configuration conf) { - if (!(conf instanceof CapacitySchedulerConfiguration)) { - throw new IllegalArgumentException("Unexpected conf type: " - + conf.getClass().getSimpleName() + " only supported conf is: " - + CapacitySchedulerConfiguration.class.getSimpleName()); - } - this.conf = (CapacitySchedulerConfiguration) conf; + public void init(String reservationQueuePath, + ReservationSchedulerConfiguration conf) { + this.conf = conf; validWindow = this.conf.getReservationWindow(reservationQueuePath); maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java index 9bce0d9..9d614fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java @@ -26,10 +26,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,95 +67,50 @@ public void reinitialize(Configuration conf, RMContext rmContext) } @Override - protected Plan initializePlan(String planQueueName) throws YarnException { - SharingPolicy adPolicy = getAdmissionPolicy(planQueueName); - String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath(); - adPolicy.init(planQueuePath, capScheduler.getConfiguration()); - CSQueue planQueue = capScheduler.getQueue(planQueueName); - // Calculate the max plan capacity - Resource minAllocation = capScheduler.getMinimumResourceCapability(); - ResourceCalculator rescCalc = capScheduler.getResourceCalculator(); - Resource totCap = - rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(), - planQueue.getAbsoluteCapacity(), minAllocation); - Plan plan = - new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy, - getAgent(planQueuePath), totCap, planStepSize, rescCalc, - minAllocation, capScheduler.getMaximumResourceCapability(), - planQueueName, getReplanner(planQueuePath), capScheduler - .getConfiguration().getMoveOnExpiry(planQueuePath)); - LOG.info("Intialized plan {0} based on reservable queue {1}", - plan.toString(), planQueueName); - return plan; + protected Resource getMinAllocation() { + return capScheduler.getMinimumResourceCapability(); } @Override - protected Planner getReplanner(String planQueueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String plannerClassName = capSchedulerConfig.getReplanner(planQueueName); - LOG.info("Using Replanner: " + plannerClassName + " for queue: " - + planQueueName); - try { - Class plannerClazz = - capSchedulerConfig.getClassByName(plannerClassName); - if (Planner.class.isAssignableFrom(plannerClazz)) { - Planner planner = - (Planner) ReflectionUtils.newInstance(plannerClazz, conf); - planner.init(planQueueName, capSchedulerConfig); - return planner; - } else { - throw new YarnRuntimeException("Class: " + plannerClazz - + " not instance of " + Planner.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Could not instantiate Planner: " - + plannerClassName + " for queue: " + planQueueName, e); - } + protected Resource getMaxAllocation() { + return capScheduler.getMaximumResourceCapability(); } @Override - protected ReservationAgent getAgent(String queueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String agentClassName = capSchedulerConfig.getReservationAgent(queueName); - LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); - try { - Class agentClazz = capSchedulerConfig.getClassByName(agentClassName); - if (ReservationAgent.class.isAssignableFrom(agentClazz)) { - return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + agentClassName - + " not instance of " + ReservationAgent.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Could not instantiate Agent: " - + agentClassName + " for queue: " + queueName, e); - } + protected ResourceCalculator getResourceCalculator() { + return capScheduler.getResourceCalculator(); } @Override - protected SharingPolicy getAdmissionPolicy(String queueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String admissionPolicyClassName = - capSchedulerConfig.getReservationAdmissionPolicy(queueName); - LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName - + " for queue: " + queueName); - try { - Class admissionPolicyClazz = - capSchedulerConfig.getClassByName(admissionPolicyClassName); - if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { - return (SharingPolicy) ReflectionUtils.newInstance( - admissionPolicyClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + admissionPolicyClassName - + " not instance of " + SharingPolicy.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " - + admissionPolicyClassName + " for queue: " + queueName, e); - } + protected QueueMetrics getRootQueueMetrics() { + return capScheduler.getRootQueueMetrics(); + } + + @Override + protected boolean getMoveOnExpiry(String planQueuePath) { + return capScheduler.getConfiguration().getMoveOnExpiry(planQueuePath); + } + + @Override + protected String getPlanQueuePath(String planQueueName) { + return capScheduler.getQueue(planQueueName).getQueuePath(); + } + + @Override + protected Resource getPlanQueueCapacity(String planQueueName) { + Resource minAllocation = getMinAllocation(); + ResourceCalculator rescCalc = getResourceCalculator(); + CSQueue planQueue = capScheduler.getQueue(planQueueName); + return rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(), + planQueue.getAbsoluteCapacity(), minAllocation); + } + + @Override + protected ReservationSchedulerConfiguration + getReservationSchedulerConfiguration() { + return capScheduler.getConfiguration(); } } + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java index 23f2be4..f87e9dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -85,7 +84,8 @@ public long getValidWindow() { } @Override - public void init(String planQueuePath, Configuration conf) { + public void init(String planQueuePath, + ReservationSchedulerConfiguration conf) { // nothing to do for this policy } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java index d2b6184..57f28ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java @@ -20,7 +20,6 @@ import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -44,5 +43,5 @@ public void plan(Plan plan, List contracts) * @param planQueueName the name of the queue for this plan * @param conf the scheduler configuration */ - void init(String planQueueName, Configuration conf); + void init(String planQueueName, ReservationSchedulerConfiguration conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java new file mode 100644 index 0000000..0e6d9bd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; + +public interface ReservationSchedulerConfiguration { + + @InterfaceAudience.Private + long DEFAULT_RESERVATION_WINDOW = 86400000L; + + @InterfaceAudience.Private + String DEFAULT_RESERVATION_ADMISSION_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy"; + + @InterfaceAudience.Private + String DEFAULT_RESERVATION_AGENT_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; + + @InterfaceAudience.Private + String DEFAULT_RESERVATION_PLANNER_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; + + @InterfaceAudience.Private + boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; + + // default to 1h lookahead enforcement + @InterfaceAudience.Private + long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000; + + @InterfaceAudience.Private + boolean DEFAULT_SHOW_RESERVATIONS_AS_QUEUES = false; + + @InterfaceAudience.Private + float DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER = 1; + + /** + * Checks if the queue participates in reservation based scheduling + * @param queue + * @return true if the queue participates in reservation based scheduling + */ + boolean isReservable(String queue); + + /** + * Gets the length of time in milliseconds for which the {@link SharingPolicy} + * checks for validity + * @param queue name of the queue + * @return length in time in milliseconds for which to check the + * {@link SharingPolicy} + */ + long getReservationWindow(String queue); + + /** + * Gets the average allowed capacity which will aggregated over the + * {@link ReservationSchedulerConfiguration#getReservationWindow} by the + * the {@linkSharingPolicy} to check aggregate used capacity + * @param queue name of the queue + * @return average capacity allowed by the {@link SharingPolicy} + */ + float getAverageCapacity(String queue); + + /** + * Gets the maximum capacity at any time that the {@link SharingPolicy} allows + * @param queue name of the queue + * @return maximum allowed capacity at any time + */ + float getInstantaneousMaxCapacity(String queue); + + /** + * Gets the name of the {@link SharingPolicy} class associated with the queue + * @param queue name of the queue + * @return the class name of the {@link SharingPolicy} + */ + String getReservationAdmissionPolicy(String queue); + + /** + * Gets the name of the {@link ReservationAgent} class associated with the + * queue + * @param queue name of the queue + * @return the class name of the {@link ReservationAgent} + */ + String getReservationAgent(String queue); + + /** + * Checks whether the reservation queues be hidden or visible + * @param queuePath name of the queue + * @return true if reservation queues should be visible + */ + boolean getShowReservationAsQueues(String queuePath); + + /** + * Gets the name of the {@link Planner} class associated with the + * queue + * @param queue name of the queue + * @return the class name of the {@link Planner} + */ + String getReplanner(String queue); + + /** + * Gets whether the applications should be killed or moved to the parent queue + * when the {@link ReservationDefinition} expires + * @param queue name of the queue + * @return true if application should be moved, false if they need to be + * killed + */ + boolean getMoveOnExpiry(String queue); + + /** + * Gets the time in milliseconds for which the {@link Planner} will verify + * the {@link Plan}s satisfy the constraints + * @param queue name of the queue + * @return the time in milliseconds for which to check constraints + */ + long getEnforcementWindow(String queue); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java index 3e5452e..8f8d24c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; /** @@ -38,7 +37,7 @@ * @param planQueuePath the name of the queue for this plan * @param conf the system configuration */ - public void init(String planQueuePath, Configuration conf); + public void init(String planQueuePath, ReservationSchedulerConfiguration conf); /** * This method runs the policy validation logic, and return true/false on diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java index e38dd3cd..b5a6a99 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java @@ -25,11 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -69,15 +67,9 @@ public SimpleCapacityReplanner() { } @Override - public void init(String planQueueName, Configuration conf) { - if (!(conf instanceof CapacitySchedulerConfiguration)) { - throw new IllegalArgumentException("Unexpected conf type: " - + conf.getClass().getSimpleName() + " only supported conf is: " - + CapacitySchedulerConfiguration.class.getSimpleName()); - } - this.lengthOfCheckZone = - ((CapacitySchedulerConfiguration) conf) - .getEnforcementWindow(planQueueName); + public void init(String planQueueName, + ReservationSchedulerConfiguration conf) { + this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 5beed37..cf25ba7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -41,13 +41,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; -public class CapacitySchedulerConfiguration extends Configuration { +public class CapacitySchedulerConfiguration extends Configuration implements ReservationSchedulerConfiguration { private static final Log LOG = LogFactory.getLog(CapacitySchedulerConfiguration.class); @@ -222,9 +223,6 @@ public QueueMapping(MappingType type, String source, String queue) { "instantaneous-max-capacity"; @Private - public static final long DEFAULT_RESERVATION_WINDOW = 86400000L; - - @Private public static final String RESERVATION_ADMISSION_POLICY = "reservation-policy"; @@ -236,35 +234,16 @@ public QueueMapping(MappingType type, String source, String queue) { "show-reservations-as-queues"; @Private - public static final String DEFAULT_RESERVATION_ADMISSION_POLICY = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy"; - - @Private - public static final String DEFAULT_RESERVATION_AGENT_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; - - @Private public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; @Private - public static final String DEFAULT_RESERVATION_PLANNER_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; - - @Private public static final String RESERVATION_MOVE_ON_EXPIRY = "reservation-move-on-expiry"; @Private - public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; - - @Private public static final String RESERVATION_ENFORCEMENT_WINDOW = "reservation-enforcement-window"; - // default to 1h lookahead enforcement - @Private - public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000; - public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -715,6 +694,7 @@ public boolean getOverrideWithQueueMappings() { return mappings; } + @Override public boolean isReservable(String queue) { boolean isReservable = getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); @@ -727,6 +707,7 @@ public void setReservable(String queue, boolean isReservable) { + ", isReservableQueue=" + isReservable(queue)); } + @Override public long getReservationWindow(String queue) { long reservationWindow = getLong(getQueuePrefix(queue) + RESERVATION_WINDOW, @@ -734,6 +715,7 @@ public long getReservationWindow(String queue) { return reservationWindow; } + @Override public float getAverageCapacity(String queue) { float avgCapacity = getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, @@ -741,6 +723,7 @@ public float getAverageCapacity(String queue) { return avgCapacity; } + @Override public float getInstantaneousMaxCapacity(String queue) { float instMaxCapacity = getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, @@ -761,6 +744,7 @@ public void setAverageCapacity(String queue, float avgCapacity) { setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity); } + @Override public String getReservationAdmissionPolicy(String queue) { String reservationPolicy = get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, @@ -773,6 +757,7 @@ public void setReservationAdmissionPolicy(String queue, set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy); } + @Override public String getReservationAgent(String queue) { String reservationAgent = get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, @@ -784,13 +769,16 @@ public void setReservationAgent(String queue, String reservationPolicy) { set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy); } + @Override public boolean getShowReservationAsQueues(String queuePath) { boolean showReservationAsQueues = getBoolean(getQueuePrefix(queuePath) - + RESERVATION_SHOW_RESERVATION_AS_QUEUE, false); + + RESERVATION_SHOW_RESERVATION_AS_QUEUE, + DEFAULT_SHOW_RESERVATIONS_AS_QUEUES); return showReservationAsQueues; } + @Override public String getReplanner(String queue) { String replanner = get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME, @@ -798,6 +786,7 @@ public String getReplanner(String queue) { return replanner; } + @Override public boolean getMoveOnExpiry(String queue) { boolean killOnExpiry = getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY, @@ -805,6 +794,7 @@ public boolean getMoveOnExpiry(String queue) { return killOnExpiry; } + @Override public long getEnforcementWindow(String queue) { long enforcementWindow = getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 8b662cc..96fdccc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -61,6 +61,18 @@ public static ReservationId getNewReservationId() { return ReservationId.newInstance(rand.nextLong(), rand.nextLong()); } + public static ReservationSchedulerConfiguration createConf( + String reservationQ, long timeWindow, float instConstraint, + float avgConstraint) { + ReservationSchedulerConfiguration conf = mock + (ReservationSchedulerConfiguration.class); + when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow); + when(conf.getInstantaneousMaxCapacity(reservationQ)).thenReturn + (instConstraint); + when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint); + return conf; + } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { @@ -105,7 +117,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { Assert.fail(e.getMessage()); } when(mockRmContext.getScheduler()).thenReturn(cs); - Resource r = Resource.newInstance(numContainers * 1024, numContainers); + Resource r = calculateClusterResource(numContainers); doReturn(r).when(cs).getClusterResource(); return cs; } @@ -254,4 +266,9 @@ public static ReservationDefinition generateBigRR(Random rand, long i) { return req; } + public static Resource calculateClusterResource(int numContainers) { + Resource clusterResource = Resource.newInstance(numContainers * 1024, + numContainers); + return clusterResource; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 451a155..61561e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Map; @@ -29,8 +30,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; @@ -74,18 +75,18 @@ public void setup() throws Exception { mAgent = mock(ReservationAgent.class); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); + QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint); - capConf.setAverageCapacity(reservationQ, avgConstraint); + Resource clusterResource = testUtil.calculateClusterResource(totCont); + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = - new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, - scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + new InMemoryPlan(rootQueueMetrics, policy, mAgent, + clusterResource, step, res, minAlloc, maxAlloc, "dedicated", null, true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java index f1e1e5c..b8cf6c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java @@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -71,18 +69,19 @@ public void setup() throws Exception { Resource clusterCapacity = Resource.newInstance(100 * 1024, 100); step = 1000L; ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setMaximumCapacity(reservationQ, 100); - capConf.setAverageCapacity(reservationQ, 100); + + float instConstraint = 100; + float avgConstraint = 100; + + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); agent = new GreedyReservationAgent(); - QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated", - mock(ParentQueue.class), false, capConf); + QueueMetrics queueMetrics = mock(QueueMetrics.class); plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); @@ -549,12 +548,13 @@ public void testStress(int numJobs) throws PlanningException, IOException { ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setMaximumCapacity(reservationQ, 100); - capConf.setAverageCapacity(reservationQ, 100); + float instConstraint = 100; + float avgConstraint = 100; + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent, clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java index 9389f12..1e15618 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -27,8 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Before; @@ -60,15 +59,17 @@ public void setup() throws Exception { mAgent = mock(ReservationAgent.class); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); + Resource clusterResource = testUtil.calculateClusterResource(totCont); + ReservationSchedulerConfiguration conf = mock + (ReservationSchedulerConfiguration.class); NoOverCommitPolicy policy = new NoOverCommitPolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = - new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, - scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + new InMemoryPlan(rootQueueMetrics, policy, mAgent, + clusterResource, step, res, minAlloc, maxAlloc, "dedicated", null, true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java index c94ef69..1ca9f2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -60,13 +59,10 @@ public void testReplanningPlanCapacityLoss() throws PlanningException { when(clock.getTime()).thenReturn(0L); SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock); - CapacitySchedulerConfiguration conf = - mock(CapacitySchedulerConfiguration.class); + ReservationSchedulerConfiguration conf = + mock(ReservationSchedulerConfiguration.class); when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L); - conf.setLong(CapacitySchedulerConfiguration.PREFIX + "blah" - + CapacitySchedulerConfiguration.DOT - + CapacitySchedulerConfiguration.RESERVATION_ENFORCEMENT_WINDOW, 6); enf.init("blah", conf); // Initialize the plan with more resources