diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 751c9a38e36..b44e2bf650a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -53,6 +53,16 @@ LogMutation logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws Exception; + /** + * Apply the changes on top of the actual configuration. + * @param oldConfiguration actual configuration + * @param confUpdate changelist + * @return new configuration with the applied changed + * @throws IOException if the merge failed + */ + Configuration applyChanges(Configuration oldConfiguration, + SchedConfUpdateInfo confUpdate) throws IOException; + /** * Confirm last logged mutation. * @param pendingMutation the log mutation to apply diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index fcc05602871..174a699545a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -207,40 +207,9 @@ public void setConf(Configuration conf) { private void validateConf(Configuration conf) { // validate scheduler memory allocation setting - int minMem = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - int maxMem = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - - if (minMem <= 0 || minMem > maxMem) { - throw new YarnRuntimeException("Invalid resource scheduler memory" - + " allocation configuration" - + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB - + "=" + minMem - + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB - + "=" + maxMem + ", min and max should be greater than 0" - + ", max should be no smaller than min."); - } - + CapacitySchedulerConfigValidator.validateMemoryAllocation(conf); // validate scheduler vcores allocation setting - int minVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - int maxVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - - if (minVcores <= 0 || minVcores > maxVcores) { - throw new YarnRuntimeException("Invalid resource scheduler vcores" - + " allocation configuration" - + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES - + "=" + minVcores - + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - + "=" + maxVcores + ", min and max should be greater than 0" - + ", max should be no smaller than min."); - } + CapacitySchedulerConfigValidator.validateVCores(conf); } @Override @@ -480,14 +449,17 @@ public void serviceStop() throws Exception { super.serviceStop(); } - @Override - public void reinitialize(Configuration newConf, RMContext rmContext) - throws IOException { + public void reinitialize(Configuration newConf, RMContext rmContext, + boolean validation) throws IOException { writeLock.lock(); try { Configuration configuration = new Configuration(newConf); CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = csConfProvider.loadConfiguration(configuration); + if (validation) { + this.conf = new CapacitySchedulerConfiguration(newConf, false); + } else { + this.conf = csConfProvider.loadConfiguration(configuration); + } validateConf(this.conf); try { LOG.info("Re-initializing queues..."); @@ -501,17 +473,26 @@ public void reinitialize(Configuration newConf, RMContext rmContext) throw new IOException("Failed to re-init queues : " + t.getMessage(), t); } + if (!validation) { - // update lazy preemption - this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); + // update lazy preemption + this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); - // Setup how many containers we can allocate for each round - offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + // Setup how many containers we can allocate for each round + offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); - super.reinitialize(newConf, rmContext); + super.reinitialize(newConf, rmContext); + } } finally { writeLock.unlock(); } + + } + + @Override + public void reinitialize(Configuration newConf, RMContext rmContext) + throws IOException { + reinitialize(newConf, rmContext, false); } long getAsyncScheduleInterval() { @@ -714,19 +695,13 @@ public void updatePlacementRules() throws IOException { Collection placementRuleStrs = conf.getStringCollection( YarnConfiguration.QUEUE_PLACEMENT_RULES); List placementRules = new ArrayList<>(); - Set distingushRuleSet = new HashSet<>(); - // fail the case if we get duplicate placementRule add in - for (String pls : placementRuleStrs) { - if (!distingushRuleSet.add(pls)) { - throw new IOException("Invalid PlacementRule inputs which " - + "contains duplicate rule strings"); - } - } + Set distinguishRuleSet = CapacitySchedulerConfigValidator + .validatePlacementRules(placementRuleStrs); // add UserGroupMappingPlacementRule if absent - distingushRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + distinguishRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); - placementRuleStrs = new ArrayList<>(distingushRuleSet); + placementRuleStrs = new ArrayList<>(distinguishRuleSet); for (String placementRuleStr : placementRuleStrs) { switch (placementRuleStr) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java new file mode 100644 index 00000000000..03f63a6f413 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public final class CapacitySchedulerConfigValidator { + private static final Logger LOG = LoggerFactory.getLogger( + CapacitySchedulerConfigValidator.class); + + private CapacitySchedulerConfigValidator() { + + throw new IllegalStateException("Utility class"); + } + + public static boolean validateCSConfiguration( + final Configuration oldConf, final Configuration newConf, + final RMContext rmContext) throws IOException { + + //TODO: extract all the validation steps and replace reinitialize with + //the specific validation steps + CapacityScheduler newCs = new CapacityScheduler(); + newCs.setConf(oldConf); + newCs.setRMContext(rmContext); + newCs.init(oldConf); + newCs.reinitialize(newConf, rmContext, true); + return true; + } + + public static Set validatePlacementRules( + Collection placementRuleStrs) throws IOException { + + Set distinguishRuleSet = new HashSet<>(); + // fail the case if we get duplicate placementRule add in + for (String pls : placementRuleStrs) { + if (!distinguishRuleSet.add(pls)) { + throw new IOException("Invalid PlacementRule inputs which " + + "contains duplicate rule strings"); + } + } + return distinguishRuleSet; + } + + public static void validateMemoryAllocation(Configuration conf) { + int minMem = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + int maxMem = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + + if (minMem <= 0 || minMem > maxMem) { + throw new YarnRuntimeException("Invalid resource scheduler memory" + + " allocation configuration" + + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + + "=" + minMem + + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + + "=" + maxMem + ", min and max should be greater than 0" + + ", max should be no smaller than min."); + } + } + public static void validateVCores(Configuration conf) { + int minVcores = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + int maxVcores = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + if (minVcores <= 0 || minVcores > maxVcores) { + throw new YarnRuntimeException("Invalid resource scheduler vcores" + + " allocation configuration" + + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES + + "=" + minVcores + + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + + "=" + maxVcores + ", min and max should be greater than 0" + + ", max should be no smaller than min."); + } + } + + /** + * Ensure all existing queues are present. Queues cannot be deleted if its not + * in Stopped state, Queue's cannot be moved from one hierarchy to other also. + * Previous child queue could be converted into parent queue if it is in + * STOPPED state. + * + * @param queues existing queues + * @param newQueues new queues + */ + public static void validateQueueHierarchy(Map queues, + Map newQueues, CapacitySchedulerConfiguration newConf) + throws IOException { + LOG.error("Old queues:"); + for (Map.Entry queue:queues.entrySet()) { + LOG.error(queue.getKey() + ": " + queue.getValue()); + } + LOG.error("New queues:"); + for (Map.Entry queue:newQueues.entrySet()) { + LOG.error(queue.getKey() + ": " + queue.getValue()); + } + // check that all static queues are included in the newQueues list + for (Map.Entry e : queues.entrySet()) { + if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue() + .getClass()))) { + String queueName = e.getKey(); + CSQueue oldQueue = e.getValue(); + CSQueue newQueue = newQueues.get(queueName); + if (null == newQueue) { + // old queue doesn't exist in the new XML + String configPrefix = newConf.getQueuePrefix( + oldQueue.getQueuePath()); + QueueState newQueueState = null; + try { + newQueueState = QueueState.valueOf( + newConf.get(configPrefix + "state")); + } catch (Exception ex) { + LOG.warn("Not a valid queue state for queue " + + oldQueue.getQueuePath()); + } + if (oldQueue.getState() == QueueState.STOPPED || + newQueueState == QueueState.STOPPED) { + LOG.info("Deleting Queue " + queueName + ", as it is not" + + " present in the modified capacity configuration xml"); + } else{ + throw new IOException(oldQueue.getQueuePath() + " cannot be" + + " deleted from the capacity scheduler configuration, " + + "as the queue is not yet in stopped state. " + + "Current State : " + oldQueue.getState()); + } + } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { + //Queue's cannot be moved from one hierarchy to other + throw new IOException( + queueName + " is moved from:" + oldQueue.getQueuePath() + + " to:" + newQueue.getQueuePath() + + " after refresh, which is not allowed."); + } else if (oldQueue instanceof ParentQueue + && !(oldQueue instanceof ManagedParentQueue) + && newQueue instanceof ManagedParentQueue) { + throw new IOException( + "Can not convert parent queue: " + oldQueue.getQueuePath() + + " to auto create enabled parent queue since " + + "it could have other pre-configured queues which is" + + " not supported"); + } else if (oldQueue instanceof ManagedParentQueue + && !(newQueue instanceof ManagedParentQueue)) { + throw new IOException( + "Cannot convert auto create enabled parent queue: " + oldQueue + .getQueuePath() + " to leaf queue. Please check " + + " parent queue's configuration " + + CapacitySchedulerConfiguration + .AUTO_CREATE_CHILD_QUEUE_ENABLED + + " is set to true"); + } else if (oldQueue instanceof LeafQueue + && newQueue instanceof ParentQueue) { + if (oldQueue.getState() == QueueState.STOPPED) { + LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() + + " to parent queue."); + } else{ + throw new IOException( + "Can not convert the leaf queue: " + oldQueue.getQueuePath() + + " to parent queue since " + + "it is not yet in stopped state. Current State : " + + oldQueue.getState()); + } + } else if (oldQueue instanceof ParentQueue + && newQueue instanceof LeafQueue) { + LOG.info("Converting the parent queue: " + oldQueue.getQueuePath() + + " to leaf queue."); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index d9b3ebd2eda..1bbc7ca891e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -36,7 +36,6 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.Permission; @@ -177,7 +176,8 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) csContext.getRMContext().getHAServiceState() != HAServiceProtocol.HAServiceState.STANDBY) { // Ensure queue hierarchy in the new XML file is proper. - validateQueueHierarchy(queues, newQueues, newConf); + CapacitySchedulerConfigValidator + .validateQueueHierarchy(queues, newQueues, newConf); } // Add new queues and delete OldQeueus only after validation. @@ -299,90 +299,6 @@ static CSQueue parseQueue( return queue; } - /** - * Ensure all existing queues are present. Queues cannot be deleted if its not - * in Stopped state, Queue's cannot be moved from one hierarchy to other also. - * Previous child queue could be converted into parent queue if it is in - * STOPPED state. - * - * @param queues existing queues - * @param newQueues new queues - */ - private void validateQueueHierarchy(Map queues, - Map newQueues, CapacitySchedulerConfiguration newConf) - throws IOException { - // check that all static queues are included in the newQueues list - for (Map.Entry e : queues.entrySet()) { - if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue() - .getClass()))) { - String queueName = e.getKey(); - CSQueue oldQueue = e.getValue(); - CSQueue newQueue = newQueues.get(queueName); - if (null == newQueue) { - // old queue doesn't exist in the new XML - String configPrefix = newConf.getQueuePrefix( - oldQueue.getQueuePath()); - QueueState newQueueState = null; - try { - newQueueState = QueueState.valueOf( - newConf.get(configPrefix + "state")); - } catch (Exception ex) { - LOG.warn("Not a valid queue state for queue " - + oldQueue.getQueuePath()); - } - if (oldQueue.getState() == QueueState.STOPPED || - newQueueState == QueueState.STOPPED) { - LOG.info("Deleting Queue " + queueName + ", as it is not" - + " present in the modified capacity configuration xml"); - } else{ - throw new IOException(oldQueue.getQueuePath() + " cannot be" - + " deleted from the capacity scheduler configuration, as the" - + " queue is not yet in stopped state. Current State : " - + oldQueue.getState()); - } - } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { - //Queue's cannot be moved from one hierarchy to other - throw new IOException( - queueName + " is moved from:" + oldQueue.getQueuePath() + " to:" - + newQueue.getQueuePath() - + " after refresh, which is not allowed."); - } else if (oldQueue instanceof ParentQueue - && !(oldQueue instanceof ManagedParentQueue) - && newQueue instanceof ManagedParentQueue) { - throw new IOException( - "Can not convert parent queue: " + oldQueue.getQueuePath() - + " to auto create enabled parent queue since " - + "it could have other pre-configured queues which is not " - + "supported"); - } else if (oldQueue instanceof ManagedParentQueue - && !(newQueue instanceof ManagedParentQueue)) { - throw new IOException( - "Cannot convert auto create enabled parent queue: " + oldQueue - .getQueuePath() + " to leaf queue. Please check " - + " parent queue's configuration " - + CapacitySchedulerConfiguration - .AUTO_CREATE_CHILD_QUEUE_ENABLED - + " is set to true"); - } else if (oldQueue instanceof LeafQueue - && newQueue instanceof ParentQueue) { - if (oldQueue.getState() == QueueState.STOPPED) { - LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() - + " to parent queue."); - } else{ - throw new IOException( - "Can not convert the leaf queue: " + oldQueue.getQueuePath() - + " to parent queue since " - + "it is not yet in stopped state. Current State : " - + oldQueue.getState()); - } - } else if (oldQueue instanceof ParentQueue - && newQueue instanceof LeafQueue) { - LOG.info("Converting the parent queue: " + oldQueue.getQueuePath() - + " to leaf queue."); - } - } - } - } /** * Updates to our list of queues: Adds the new queues and deletes the removed diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 09146408fb3..91bc47a9669 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -131,17 +131,35 @@ public ConfigurationMutationACLPolicy getAclMutationPolicy() { public LogMutation logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws Exception { oldConf = new Configuration(schedConf); - Map kvUpdate = constructKeyValueConfUpdate(confUpdate); + CapacitySchedulerConfiguration proposedConf = + new CapacitySchedulerConfiguration(schedConf, false); + Map kvUpdate + = constructKeyValueConfUpdate(proposedConf, confUpdate); LogMutation log = new LogMutation(kvUpdate, user.getShortUserName()); confStore.logMutation(log); + applyMutation(proposedConf, kvUpdate); + schedConf = proposedConf; + return log; + } + + public Configuration applyChanges(Configuration oldConfiguration, + SchedConfUpdateInfo confUpdate) throws IOException { + CapacitySchedulerConfiguration proposedConf = + new CapacitySchedulerConfiguration(oldConfiguration, false); + Map kvUpdate + = constructKeyValueConfUpdate(proposedConf, confUpdate); + applyMutation(proposedConf, kvUpdate); + return proposedConf; + } + + private void applyMutation(Configuration conf, Map kvUpdate) { for (Map.Entry kv : kvUpdate.entrySet()) { if (kv.getValue() == null) { - schedConf.unset(kv.getKey()); + conf.unset(kv.getKey()); } else { - schedConf.set(kv.getKey(), kv.getValue()); + conf.set(kv.getKey(), kv.getValue()); } } - return log; } @Override @@ -217,9 +235,9 @@ public void reloadConfigurationFromStore() throws Exception { } private Map constructKeyValueConfUpdate( + CapacitySchedulerConfiguration proposedConf, SchedConfUpdateInfo mutationInfo) throws IOException { - CapacitySchedulerConfiguration proposedConf = - new CapacitySchedulerConfiguration(schedConf, false); + Map confUpdate = new HashMap<>(); for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { removeQueue(queueToRemove, proposedConf, confUpdate); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java index b7381038855..30406e547d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java @@ -57,6 +57,12 @@ /** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */ public static final String SCHEDULER_LOGS = "/scheduler/logs"; + /** + * Path for {@code RMWebServiceProtocol#validateAndGetSchedulerConfiguration}. + */ + public static final String SCHEDULER_CONF_VALIDATE + = "/scheduler-conf/validate"; + /** Path for {@code RMWebServiceProtocol#getNodes}. */ public static final String NODES = "/nodes"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index bdd8e6456ab..bb4cd541085 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -148,6 +148,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigValidator;import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; @@ -2618,6 +2619,52 @@ public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr) } } + @POST + @Path(RMWSConsts.SCHEDULER_CONF_VALIDATE) + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public synchronized Response validateAndGetSchedulerConfiguration( + SchedConfUpdateInfo mutationInfo, + @Context HttpServletRequest hsr) throws AuthorizationException { + // Only admin user is allowed to read scheduler conf, + // in order to avoid leaking sensitive info, such as ACLs + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + initForWritableEndpoints(callerUGI, true); + ResourceScheduler scheduler = rm.getResourceScheduler(); + if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler) + scheduler).isConfigurationMutable()) { + try { + MutableConfigurationProvider mutableConfigurationProvider = + ((MutableConfScheduler) scheduler).getMutableConfProvider(); + Configuration schedulerConf = mutableConfigurationProvider + .getConfiguration(); + Configuration newConfig = mutableConfigurationProvider + .applyChanges(schedulerConf, mutationInfo); + CapacitySchedulerConfigValidator.validateCSConfiguration(schedulerConf, + newConfig, rm.getRMContext()); + + return Response.status(Status.OK) + .entity(new ConfInfo(newConfig)) + .build(); + } catch (Exception e) { + String errorMsg = "CapacityScheduler configuration validation failed:" + + e.toString(); + LOG.warn(errorMsg); + return Response.status(Status.BAD_REQUEST) + .entity(errorMsg) + .build(); + } + } else { + String errorMsg = "Configuration change validation only supported by " + + "MutableConfScheduler."; + LOG.warn(errorMsg); + return Response.status(Status.BAD_REQUEST) + .entity(errorMsg) + .build(); + } + } + @PUT @Path(RMWSConsts.SCHEDULER_CONF) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java new file mode 100644 index 00000000000..a60c1d88415 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.fail; + +public class TestCapacitySchedulerConfigValidator { + + /** + * Test for the case when the scheduler.minimum-allocation-mb == 0. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateMemoryAllocationInvalidMinMem() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "0"); + Configuration config = createConfiguration(configs); + CapacitySchedulerConfigValidator.validateMemoryAllocation(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + + " should be > 0"); + } + + /** + * Test for the case when the scheduler.minimum-allocation-mb is greater than + * scheduler.maximum-allocation-mb. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateMemoryAllocationHIgherMinThanMaxMem() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "8192"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "1024"); + Configuration config = createConfiguration(configs); + CapacitySchedulerConfigValidator.validateMemoryAllocation(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + " should be > " + + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + + } + + + @Test + public void testValidateMemoryAllocation() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "1024"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "8192"); + Configuration config = createConfiguration(configs); + CapacitySchedulerConfigValidator.validateMemoryAllocation(config); + } + + /** + * Test for the case when the scheduler.minimum-allocation-mb == 0. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateVCoresInvalidMinVCore() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "0"); + Configuration config = createConfiguration(configs); + CapacitySchedulerConfigValidator.validateVCores(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES + + " should be > 0"); + } + + /** + * Test for the case when the scheduler.minimum-allocation-mb is greater than + * scheduler.maximum-allocation-mb. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateVCoresHigherMinThanMaxVCore() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "1"); + Configuration config = createConfiguration(configs); + CapacitySchedulerConfigValidator.validateVCores(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES + + " should be > " + + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + + } + + + @Test + public void testValidateVCores() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "1"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "4"); + Configuration config = createConfiguration(configs); + CapacitySchedulerConfigValidator.validateVCores(config); + } + + @Test + public void testValidateCSConfigInvalidCapacity() { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.test1.capacity", "500"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .startsWith("Illegal capacity")); + } + } + + @Test + public void testValidateCSConfigStopALeafQueue() throws IOException { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); + RMContext rmContext = prepareRMContext(); + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + } + + /** + * Stop a leaf queue if there are running child queues. + */ + @Test + public void testValidateCSConfigStopANonLeafQueueInvalid() { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.state", "STOPPED"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("There are child queues in running state"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .contains("The parent queue:root cannot be STOPPED")); + } + } + + @Test + public void testValidateCSConfigStopANonLeafQueue() throws IOException { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.state", "STOPPED"); + newConfig + .set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); + newConfig + .set("yarn.scheduler.capacity.root.test2.state", "STOPPED"); + RMContext rmContext = prepareRMContext(); + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + + } + + /** + * Add a leaf queue without modifying the capacity of other leaf queues + * so the total capacity != 100. + */ + @Test + public void testValidateCSConfigAddALeafQueueInvalid() { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3"); + newConfig + .set("yarn.scheduler.capacity.root.test3", "STARTED"); + newConfig + .set("yarn.scheduler.capacity.root.test3", "STARTED"); + newConfig + .set("yarn.scheduler.capacity.root.test3.capacity", "30"); + + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity for children of queue root"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .startsWith("Illegal capacity")); + } + } + + /** + * Add a leaf queue by modifying the capacity of other leaf queues + * and adjust the capacities of other leaf queues, so total capacity = 100. + */ + @Test + public void testValidateCSConfigAddALeafQueueValid() throws IOException { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3"); + newConfig + .set("yarn.scheduler.capacity.root.test3", "STARTED"); + newConfig + .set("yarn.scheduler.capacity.root.test3", "STARTED"); + newConfig + .set("yarn.scheduler.capacity.root.test3.capacity", "30"); + newConfig + .set("yarn.scheduler.capacity.root.test1.capacity", "20"); + + RMContext rmContext = prepareRMContext(); + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + } + + /** + * Delete a running queue. + */ + @Test + public void testValidateCSConfigInvalidQueueDeletion() { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.queues", "test1"); + newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100"); + newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity"); + newConfig.unset("yarn.scheduler.capacity.root.test2.state"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity for children of queue root"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .contains("root.test2 cannot be deleted")); + Assert.assertTrue(e.getCause().getMessage() + .contains("the queue is not yet in stopped state")); + } + } + + /** + * Delete a queue and not adjust capacities. + */ + @Test + public void testValidateCSConfigInvalidQueueDeletion2() { + Configuration oldConfig = createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.queues", "test1"); + newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity"); + newConfig.unset("yarn.scheduler.capacity.root.test2.state"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity for children of queue root"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .contains("Illegal capacity")); + } + } + + /** + * Delete a queue and adjust capacities to have total capacity = 100. + */ + @Test + public void testValidateCSConfigValidQueueDeletion() throws IOException { + Configuration oldConfig = createBasicCSConfiguration(); + oldConfig.set("yarn.scheduler.capacity.root.test2.state", "STOPPED"); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.queues", "test1"); + newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100"); + newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity"); + newConfig.unset("yarn.scheduler.capacity.root.test2.state"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1"); + RMContext rmContext = prepareRMContext(); + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + + } + + @Test + public void testAddQueueToALeafQueue() throws IOException { + Configuration oldConfig = createBasicCSConfiguration(); + oldConfig.set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.test1.queues", "newQueue"); + newConfig + .set("yarn.scheduler.capacity.root.test1.newQueue.capacity", "100"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test2"); + RMContext rmContext = prepareRMContext(); + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + } + + private Configuration createConfiguration(Map configs) { + Configuration config = new Configuration(); + for (Map.Entry entry: configs.entrySet()) { + config.set((String)entry.getKey(), (String)entry.getValue()); + } + return config; + } + + private Configuration createBasicCSConfiguration() { + Map conf = new HashMap<>(); + conf.put("yarn.scheduler.capacity.root.queues", "test1, test2"); + conf.put("yarn.scheduler.capacity.root.test1.capacity", "50"); + conf.put("yarn.scheduler.capacity.root.test2.capacity", "50"); + conf.put("yarn.scheduler.capacity.root.test1.maximum-capacity", "100"); + conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); + conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); + conf.put("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1,u:test2:test2"); + return createConfiguration(conf); + } + + private RMContext prepareRMContext() { + RMContext rmContext = Mockito.mock(RMContext.class); + LocalConfigurationProvider configProvider = Mockito + .mock(LocalConfigurationProvider.class); + Mockito.when(rmContext.getConfigurationProvider()) + .thenReturn(configProvider); + RMNodeLabelsManager nodeLabelsManager = Mockito + .mock(RMNodeLabelsManager.class); + Mockito.when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager); + LightWeightResource partitionResource = Mockito + .mock(LightWeightResource.class); + Mockito.when(nodeLabelsManager + .getResourceByLabel(Mockito.any(), Mockito.any())) + .thenReturn(partitionResource); + PlacementManager queuePlacementManager = Mockito + .mock(PlacementManager.class); + Mockito.when(rmContext.getQueuePlacementManager()) + .thenReturn(queuePlacementManager); + return rmContext; + } +}