diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 91fd12c..360de2b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -34,15 +34,18 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,10 +332,26 @@ public static String getDefaultReservationSystem(ResourceScheduler scheduler) { protected abstract Plan initializePlan(String planQueueName) throws YarnException; + protected abstract ReservationSchedulerConfiguration + getReservationSchedulerConfiguration(); + + protected abstract String getPlanQueuePath(String planQueueName); + + protected abstract Resource getPlanQueueCapacity(String planQueueName); + protected abstract Planner getReplanner(String planQueueName); protected abstract ReservationAgent getAgent(String queueName); protected abstract SharingPolicy getAdmissionPolicy(String queueName); + protected abstract Resource getMinAllocation(); + + protected abstract Resource getMaxAllocation(); + + protected abstract ResourceCalculator getResourceCalculator(); + + protected abstract QueueMetrics getRootQueueMetrics(); + + protected abstract boolean getMoveOnExpiry(String planQueueName); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java index 7552e8c..afba7ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -21,13 +21,11 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -52,7 +50,7 @@ @Unstable public class CapacityOverTimePolicy implements SharingPolicy { - private CapacitySchedulerConfiguration conf; + private ReservationSchedulerConfiguration conf; private long validWindow; private float maxInst; private float maxAvg; @@ -61,13 +59,9 @@ // configuration structure of the schedulers (e.g., SchedulerConfiguration) // it should be easy to remove this limitation @Override - public void init(String reservationQueuePath, Configuration conf) { - if (!(conf instanceof CapacitySchedulerConfiguration)) { - throw new IllegalArgumentException("Unexpected conf type: " - + conf.getClass().getSimpleName() + " only supported conf is: " - + CapacitySchedulerConfiguration.class.getSimpleName()); - } - this.conf = (CapacitySchedulerConfiguration) conf; + public void init(String reservationQueuePath, + ReservationSchedulerConfiguration conf) { + this.conf = conf; validWindow = this.conf.getReservationWindow(reservationQueuePath); maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java index 9bce0d9..c37fb9b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java @@ -26,10 +26,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,41 +68,83 @@ public void reinitialize(Configuration conf, RMContext rmContext) @Override protected Plan initializePlan(String planQueueName) throws YarnException { - SharingPolicy adPolicy = getAdmissionPolicy(planQueueName); - String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath(); - adPolicy.init(planQueuePath, capScheduler.getConfiguration()); - CSQueue planQueue = capScheduler.getQueue(planQueueName); + String planQueuePath = getPlanQueuePath(planQueueName); + SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath); + adPolicy.init(planQueuePath, getReservationSchedulerConfiguration()); // Calculate the max plan capacity - Resource minAllocation = capScheduler.getMinimumResourceCapability(); - ResourceCalculator rescCalc = capScheduler.getResourceCalculator(); - Resource totCap = - rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(), - planQueue.getAbsoluteCapacity(), minAllocation); + Resource minAllocation = getMinAllocation(); + Resource maxAllocation = getMaxAllocation(); + ResourceCalculator rescCalc = getResourceCalculator(); + Resource totCap = getPlanQueueCapacity(planQueueName); Plan plan = - new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy, + new InMemoryPlan(getRootQueueMetrics(), adPolicy, getAgent(planQueuePath), totCap, planStepSize, rescCalc, - minAllocation, capScheduler.getMaximumResourceCapability(), - planQueueName, getReplanner(planQueuePath), capScheduler - .getConfiguration().getMoveOnExpiry(planQueuePath)); + minAllocation, maxAllocation, planQueueName, + getReplanner(planQueuePath), getMoveOnExpiry (planQueuePath)); LOG.info("Intialized plan {0} based on reservable queue {1}", plan.toString(), planQueueName); return plan; } @Override + protected Resource getMinAllocation() { + return capScheduler.getMinimumResourceCapability(); + } + + @Override + protected Resource getMaxAllocation() { + return capScheduler.getMaximumResourceCapability(); + } + + @Override + protected ResourceCalculator getResourceCalculator() { + return capScheduler.getResourceCalculator(); + } + + @Override + protected QueueMetrics getRootQueueMetrics() { + return capScheduler.getRootQueueMetrics(); + } + + @Override + protected boolean getMoveOnExpiry(String planQueuePath) { + return capScheduler + .getConfiguration().getMoveOnExpiry(planQueuePath); + } + + @Override + protected String getPlanQueuePath(String planQueueName) { + return capScheduler.getQueue(planQueueName).getQueuePath(); + } + + @Override + protected Resource getPlanQueueCapacity(String planQueueName) { + Resource minAllocation = getMinAllocation(); + ResourceCalculator rescCalc = getResourceCalculator(); + CSQueue planQueue = capScheduler.getQueue(planQueueName); + return rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(), + planQueue.getAbsoluteCapacity(), minAllocation); + } + + @Override + protected ReservationSchedulerConfiguration + getReservationSchedulerConfiguration() { + return capScheduler.getConfiguration(); + } + + @Override protected Planner getReplanner(String planQueueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String plannerClassName = capSchedulerConfig.getReplanner(planQueueName); + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String plannerClassName = reservationConfig.getReplanner(planQueueName); LOG.info("Using Replanner: " + plannerClassName + " for queue: " + planQueueName); try { - Class plannerClazz = - capSchedulerConfig.getClassByName(plannerClassName); + Class plannerClazz = conf.getClassByName(plannerClassName); if (Planner.class.isAssignableFrom(plannerClazz)) { Planner planner = (Planner) ReflectionUtils.newInstance(plannerClazz, conf); - planner.init(planQueueName, capSchedulerConfig); + planner.init(planQueueName, reservationConfig); return planner; } else { throw new YarnRuntimeException("Class: " + plannerClazz @@ -116,12 +158,12 @@ protected Planner getReplanner(String planQueueName) { @Override protected ReservationAgent getAgent(String queueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); - String agentClassName = capSchedulerConfig.getReservationAgent(queueName); + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String agentClassName = reservationConfig.getReservationAgent(queueName); LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); try { - Class agentClazz = capSchedulerConfig.getClassByName(agentClassName); + Class agentClazz = conf.getClassByName(agentClassName); if (ReservationAgent.class.isAssignableFrom(agentClazz)) { return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); } else { @@ -136,15 +178,15 @@ protected ReservationAgent getAgent(String queueName) { @Override protected SharingPolicy getAdmissionPolicy(String queueName) { - CapacitySchedulerConfiguration capSchedulerConfig = - capScheduler.getConfiguration(); + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); String admissionPolicyClassName = - capSchedulerConfig.getReservationAdmissionPolicy(queueName); + reservationConfig.getReservationAdmissionPolicy(queueName); LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName + " for queue: " + queueName); try { Class admissionPolicyClazz = - capSchedulerConfig.getClassByName(admissionPolicyClassName); + conf.getClassByName(admissionPolicyClassName); if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { return (SharingPolicy) ReflectionUtils.newInstance( admissionPolicyClazz, conf); @@ -157,5 +199,6 @@ protected SharingPolicy getAdmissionPolicy(String queueName) { + admissionPolicyClassName + " for queue: " + queueName, e); } } - } + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java index 23f2be4..f87e9dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -85,7 +84,8 @@ public long getValidWindow() { } @Override - public void init(String planQueuePath, Configuration conf) { + public void init(String planQueuePath, + ReservationSchedulerConfiguration conf) { // nothing to do for this policy } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java index d2b6184..57f28ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java @@ -20,7 +20,6 @@ import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -44,5 +43,5 @@ public void plan(Plan plan, List contracts) * @param planQueueName the name of the queue for this plan * @param conf the scheduler configuration */ - void init(String planQueueName, Configuration conf); + void init(String planQueueName, ReservationSchedulerConfiguration conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java new file mode 100644 index 0000000..98b213a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + + +public interface ReservationSchedulerConfiguration { + + boolean isReservable(String queue); + + long getReservationWindow(String queue); + + float getAverageCapacity(String queue); + + float getInstantaneousMaxCapacity(String queue); + + String getReservationAdmissionPolicy(String queue); + + String getReservationAgent(String queue); + + boolean getShowReservationAsQueues(String queuePath); + + String getReplanner(String queue); + + boolean getMoveOnExpiry(String queue); + + long getEnforcementWindow(String queue); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java index 3e5452e..8f8d24c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; /** @@ -38,7 +37,7 @@ * @param planQueuePath the name of the queue for this plan * @param conf the system configuration */ - public void init(String planQueuePath, Configuration conf); + public void init(String planQueuePath, ReservationSchedulerConfiguration conf); /** * This method runs the policy validation logic, and return true/false on diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java index e38dd3cd..b5a6a99 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java @@ -25,11 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -69,15 +67,9 @@ public SimpleCapacityReplanner() { } @Override - public void init(String planQueueName, Configuration conf) { - if (!(conf instanceof CapacitySchedulerConfiguration)) { - throw new IllegalArgumentException("Unexpected conf type: " - + conf.getClass().getSimpleName() + " only supported conf is: " - + CapacitySchedulerConfiguration.class.getSimpleName()); - } - this.lengthOfCheckZone = - ((CapacitySchedulerConfiguration) conf) - .getEnforcementWindow(planQueueName); + public void init(String planQueueName, + ReservationSchedulerConfiguration conf) { + this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b1f239c..5583b98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.*; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -31,11 +29,20 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation + .ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -public class CapacitySchedulerConfiguration extends Configuration { +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; + +public class CapacitySchedulerConfiguration extends Configuration implements ReservationSchedulerConfiguration { private static final Log LOG = LogFactory.getLog(CapacitySchedulerConfiguration.class); @@ -569,6 +576,7 @@ public boolean getOverrideWithQueueMappings() { return mappings; } + @Override public boolean isReservable(String queue) { boolean isReservable = getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); @@ -581,6 +589,7 @@ public void setReservable(String queue, boolean isReservable) { + ", isReservableQueue=" + isReservable(queue)); } + @Override public long getReservationWindow(String queue) { long reservationWindow = getLong(getQueuePrefix(queue) + RESERVATION_WINDOW, @@ -588,6 +597,7 @@ public long getReservationWindow(String queue) { return reservationWindow; } + @Override public float getAverageCapacity(String queue) { float avgCapacity = getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, @@ -595,6 +605,7 @@ public float getAverageCapacity(String queue) { return avgCapacity; } + @Override public float getInstantaneousMaxCapacity(String queue) { float instMaxCapacity = getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, @@ -615,6 +626,7 @@ public void setAverageCapacity(String queue, float avgCapacity) { setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity); } + @Override public String getReservationAdmissionPolicy(String queue) { String reservationPolicy = get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, @@ -627,6 +639,7 @@ public void setReservationAdmissionPolicy(String queue, set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy); } + @Override public String getReservationAgent(String queue) { String reservationAgent = get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, @@ -638,6 +651,7 @@ public void setReservationAgent(String queue, String reservationPolicy) { set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy); } + @Override public boolean getShowReservationAsQueues(String queuePath) { boolean showReservationAsQueues = getBoolean(getQueuePrefix(queuePath) @@ -645,6 +659,7 @@ public boolean getShowReservationAsQueues(String queuePath) { return showReservationAsQueues; } + @Override public String getReplanner(String queue) { String replanner = get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME, @@ -652,6 +667,7 @@ public String getReplanner(String queue) { return replanner; } + @Override public boolean getMoveOnExpiry(String queue) { boolean killOnExpiry = getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY, @@ -659,6 +675,7 @@ public boolean getMoveOnExpiry(String queue) { return killOnExpiry; } + @Override public long getEnforcementWindow(String queue) { long enforcementWindow = getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 71b5b8b..583f379 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; @@ -55,6 +56,18 @@ public static ReservationId getNewReservationId() { return ReservationId.newInstance(rand.nextLong(), rand.nextLong()); } + public static ReservationSchedulerConfiguration createConf( + String reservationQ, long timeWindow, float instConstraint, + float avgConstraint) { + ReservationSchedulerConfiguration conf = mock + (ReservationSchedulerConfiguration.class); + when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow); + when(conf.getInstantaneousMaxCapacity(reservationQ)).thenReturn + (instConstraint); + when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint); + return conf; + } + public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { // stolen from TestCapacityScheduler @@ -75,7 +88,7 @@ public CapacityScheduler mockCapacityScheduler(int numContainers) Assert.fail(e.getMessage()); } when(mockRmContext.getScheduler()).thenReturn(cs); - Resource r = Resource.newInstance(numContainers * 1024, numContainers); + Resource r = calculateClusterResource(numContainers); doReturn(r).when(cs).getClusterResource(); return cs; } @@ -224,4 +237,9 @@ public static ReservationDefinition generateBigRR(Random rand, long i) { return req; } + public static Resource calculateClusterResource(int numContainers) { + Resource clusterResource = Resource.newInstance(numContainers * 1024, + numContainers); + return clusterResource; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 451a155..61561e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Map; @@ -29,8 +30,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; @@ -74,18 +75,18 @@ public void setup() throws Exception { mAgent = mock(ReservationAgent.class); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); + QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint); - capConf.setAverageCapacity(reservationQ, avgConstraint); + Resource clusterResource = testUtil.calculateClusterResource(totCont); + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = - new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, - scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + new InMemoryPlan(rootQueueMetrics, policy, mAgent, + clusterResource, step, res, minAlloc, maxAlloc, "dedicated", null, true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java index f1e1e5c..b8cf6c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java @@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -71,18 +69,19 @@ public void setup() throws Exception { Resource clusterCapacity = Resource.newInstance(100 * 1024, 100); step = 1000L; ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setMaximumCapacity(reservationQ, 100); - capConf.setAverageCapacity(reservationQ, 100); + + float instConstraint = 100; + float avgConstraint = 100; + + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); agent = new GreedyReservationAgent(); - QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated", - mock(ParentQueue.class), false, capConf); + QueueMetrics queueMetrics = mock(QueueMetrics.class); plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); @@ -549,12 +548,13 @@ public void testStress(int numJobs) throws PlanningException, IOException { ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); - capConf.setReservationWindow(reservationQ, timeWindow); - capConf.setMaximumCapacity(reservationQ, 100); - capConf.setAverageCapacity(reservationQ, 100); + float instConstraint = 100; + float avgConstraint = 100; + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent, clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java index 9389f12..1e15618 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -27,8 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Before; @@ -60,15 +59,17 @@ public void setup() throws Exception { mAgent = mock(ReservationAgent.class); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); + Resource clusterResource = testUtil.calculateClusterResource(totCont); + ReservationSchedulerConfiguration conf = mock + (ReservationSchedulerConfiguration.class); NoOverCommitPolicy policy = new NoOverCommitPolicy(); - policy.init(reservationQ, capConf); + policy.init(reservationQ, conf); plan = - new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, - scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + new InMemoryPlan(rootQueueMetrics, policy, mAgent, + clusterResource, step, res, minAlloc, maxAlloc, "dedicated", null, true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java index c94ef69..1ca9f2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -60,13 +59,10 @@ public void testReplanningPlanCapacityLoss() throws PlanningException { when(clock.getTime()).thenReturn(0L); SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock); - CapacitySchedulerConfiguration conf = - mock(CapacitySchedulerConfiguration.class); + ReservationSchedulerConfiguration conf = + mock(ReservationSchedulerConfiguration.class); when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L); - conf.setLong(CapacitySchedulerConfiguration.PREFIX + "blah" - + CapacitySchedulerConfiguration.DOT - + CapacitySchedulerConfiguration.RESERVATION_ENFORCEMENT_WINDOW, 6); enf.init("blah", conf); // Initialize the plan with more resources