diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/scheduler.xml new file mode 100644 index 0000000..1a5d51b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/scheduler.xml @@ -0,0 +1,45 @@ + + + + + + capacity + 10000 + + org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator + + + false + + + 0.1 + + + + + RUNNING + * + * + + + 1 + 100 + 100 + 40 + + + + + \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java index 2af1ffd..76aee95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -19,10 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; -public abstract class ReservationSchedulerConfiguration extends Configuration { +public interface ReservationSchedulerConfiguration { @InterfaceAudience.Private public static final long DEFAULT_RESERVATION_WINDOW = 24*60*60*1000; // 1 day in msec @@ -53,13 +52,6 @@ @InterfaceAudience.Private public static final float DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER = 1; - public ReservationSchedulerConfiguration() { super(); } - - public ReservationSchedulerConfiguration( - Configuration configuration) { - super(configuration); - } - /** * Checks if the queue participates in reservation based scheduling * @param queue @@ -74,9 +66,7 @@ public ReservationSchedulerConfiguration( * @return length in time in milliseconds for which to check the * {@link SharingPolicy} */ - public long getReservationWindow(String queue) { - return DEFAULT_RESERVATION_WINDOW; - } + public long getReservationWindow(String queue); /** * Gets the average allowed capacity which will aggregated over the @@ -85,27 +75,21 @@ public long getReservationWindow(String queue) { * @param queue name of the queue * @return average capacity allowed by the {@link SharingPolicy} */ - public float getAverageCapacity(String queue) { - return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; - } + public float getAverageCapacity(String queue); /** * Gets the maximum capacity at any time that the {@link SharingPolicy} allows * @param queue name of the queue * @return maximum allowed capacity at any time */ - public float getInstantaneousMaxCapacity(String queue) { - return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; - } + public float getInstantaneousMaxCapacity(String queue); /** * Gets the name of the {@link SharingPolicy} class associated with the queue * @param queue name of the queue * @return the class name of the {@link SharingPolicy} */ - public String getReservationAdmissionPolicy(String queue) { - return DEFAULT_RESERVATION_ADMISSION_POLICY; - } + public String getReservationAdmissionPolicy(String queue); /** * Gets the name of the {@link ReservationAgent} class associated with the @@ -113,18 +97,14 @@ public String getReservationAdmissionPolicy(String queue) { * @param queue name of the queue * @return the class name of the {@link ReservationAgent} */ - public String getReservationAgent(String queue) { - return DEFAULT_RESERVATION_AGENT_NAME; - } + public String getReservationAgent(String queue); /** * Checks whether the reservation queues be hidden or visible * @param queuePath name of the queue * @return true if reservation queues should be visible */ - public boolean getShowReservationAsQueues(String queuePath) { - return DEFAULT_SHOW_RESERVATIONS_AS_QUEUES; - } + public boolean getShowReservationAsQueues(String queuePath); /** * Gets the name of the {@link Planner} class associated with the @@ -132,9 +112,7 @@ public boolean getShowReservationAsQueues(String queuePath) { * @param queue name of the queue * @return the class name of the {@link Planner} */ - public String getReplanner(String queue) { - return DEFAULT_RESERVATION_PLANNER_NAME; - } + public String getReplanner(String queue); /** * Gets whether the applications should be killed or moved to the parent queue @@ -143,9 +121,7 @@ public String getReplanner(String queue) { * @return true if application should be moved, false if they need to be * killed */ - public boolean getMoveOnExpiry(String queue) { - return DEFAULT_RESERVATION_MOVE_ON_EXPIRY; - } + public boolean getMoveOnExpiry(String queue); /** * Gets the time in milliseconds for which the {@link Planner} will verify @@ -153,7 +129,5 @@ public boolean getMoveOnExpiry(String queue) { * @param queue name of the queue * @return the time in milliseconds for which to check constraints */ - public long getEnforcementWindow(String queue) { - return DEFAULT_RESERVATION_ENFORCEMENT_WINDOW; - } + public long getEnforcementWindow(String queue); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSConfiguration.java new file mode 100644 index 0000000..879ea13 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSConfiguration.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +public interface AbstractCSConfiguration extends + ReservationSchedulerConfiguration { + @Private + public static final String DOT = "."; + + @Private + public static final String MAXIMUM_APPLICATIONS = + "maximum-applications"; + + @Private + public static final String MAXIMUM_AM_RESOURCE = + "maximum-am-resource-percent"; + + @Private + public static final String CAPACITY = "capacity"; + + @Private + public static final String MAXIMUM_CAPACITY = "maximum-capacity"; + + @Private + public static final String USER_LIMIT = "minimum-user-limit-percent"; + + @Private + public static final String USER_LIMIT_FACTOR = "user-limit-factor"; + + @Private + public static final String STATE = "state"; + + @Private + public static final String DEFAULT_NODE_LABEL_EXPRESSION = + "default-node-label-expression"; + + public static final String RESERVE_CONT_LOOK_ALL_NODES_SUFFIX = + "reservations-continue-look-all-nodes"; + + @Private + public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true; + + @Private + public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb"; + + @Private + public static final String MAXIMUM_ALLOCATION_VCORES = + "maximum-allocation-vcores"; + + @Private + public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; + + @Private + public static final float + DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f; + + @Private + public static final float UNDEFINED = -1; + + @Private + public static final float MINIMUM_CAPACITY_VALUE = 0; + + @Private + public static final float MAXIMUM_CAPACITY_VALUE = 100; + + @Private + public static final float DEFAULT_MAXIMUM_CAPACITY_VALUE = -1.0f; + + @Private + public static final int DEFAULT_USER_LIMIT = 100; + + @Private + public static final float DEFAULT_USER_LIMIT_FACTOR = 1.0f; + + @Private + public static final String ALL_ACL = "*"; + + @Private + public static final String NONE_ACL = " "; + + @Private + public static final String ENABLE_USER_METRICS = "user-metrics.enable"; + + @Private + public static final boolean DEFAULT_ENABLE_USER_METRICS = false; + + /** ResourceComparator for scheduling. */ + @Private + public static final String RESOURCE_CALCULATOR_CLASS = "resource-calculator"; + + @Private public static final Class + DEFAULT_RESOURCE_CALCULATOR_CLASS = DefaultResourceCalculator.class; + + @Private + public static final String ROOT = "root"; + + @Private + public static final String NODE_LOCALITY_DELAY = "node-locality-delay"; + + @Private + public static final int DEFAULT_NODE_LOCALITY_DELAY = -1; + + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX = + "schedule-asynchronously"; + + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_ENABLE = + SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable"; + + @Private + public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; + + @Private + public static final String QUEUE_MAPPING = "queue-mappings"; + + @Private + public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable"; + + @Private + public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; + + @Private + public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; + + @Private + public static final String AVERAGE_CAPACITY = "average-capacity"; + + @Private + public static final String IS_RESERVABLE = "reservable"; + + @Private + public static final String RESERVATION_WINDOW = "reservation-window"; + + @Private + public static final String INSTANTANEOUS_MAX_CAPACITY = + "instantaneous-max-capacity"; + + @Private + public static final String RESERVATION_ADMISSION_POLICY = + "reservation-policy"; + + @Private + public static final String RESERVATION_AGENT_NAME = "reservation-agent"; + + @Private + public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE = + "show-reservations-as-queues"; + + @Private + public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; + + @Private + public static final String RESERVATION_MOVE_ON_EXPIRY = + "reservation-move-on-expiry"; + + @Private + public static final String RESERVATION_ENFORCEMENT_WINDOW = + "reservation-enforcement-window"; + + @Private + public static final String ASYNC_SCHEDULER_INTERVAL = "scheduling-interval-ms"; + + @Private + public static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; + + @Private + public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels"; + + + public int getMaximumSystemApplications(); + + public float getMaximumApplicationMasterResourcePercent(); + + /** + * Get the maximum applications per queue setting. + * @param queue name of the queue + * @return setting specified or -1 if not set + */ + public int getMaximumApplicationsPerQueue(String queue); + + public void setMaximumApplicationsPerQueue(String queue, int value); + + /** + * Get the maximum am resource percent per queue setting. + * @param queue name of the queue + * @return per queue setting or defaults to the global am-resource-percent + * setting if per queue setting not present + */ + public float getMaximumApplicationMasterResourcePerQueuePercent(String queue); + + public void setMaximumApplicationMasterResourcePerQueuePercent(String queue, + float value); + + public float getNonLabeledQueueCapacity(String queue); + + public void setCapacity(String queue, float capacity); + + public float getNonLabeledQueueMaximumCapacity(String queue); + + public void setMaximumCapacity(String queue, float maxCapacity); + + public void setCapacityByLabel(String queue, String label, float capacity); + + public void setMaximumCapacityByLabel(String queue, String label, + float capacity); + + public int getUserLimit(String queue); + + public void setUserLimit(String queue, int userLimit); + + public float getUserLimitFactor(String queue); + + public void setUserLimitFactor(String queue, float userLimitFactor); + + public QueueState getState(String queue); + + public void setAccessibleNodeLabels(String queue, Set labels); + + public Set getAccessibleNodeLabels(String queue); + + public float getLabeledQueueCapacity(String queue, String label); + + public float getLabeledQueueMaximumCapacity(String queue, String label); + + public String getDefaultNodeLabelExpression(String queue); + + public void setDefaultNodeLabelExpression(String queue, String exp); + + public boolean getReservationContinueLook(); + + public AccessControlList getAcl(String queue, QueueACL acl); + + public void setAcl(String queue, QueueACL acl, String aclString); + + public Map getAcls(String queue); + + public void setAcls(String queue, Map acls); + + public String[] getQueues(String queue); + + public void setQueues(String queue, String[] subQueues); + + public Resource getMinimumAllocation(); + + public Resource getMaximumAllocation(); + + /** + * Get the per queue setting for the maximum limit to allocate to + * each container request. + * + * @param queue + * name of the queue + * @return setting specified per queue else falls back to the cluster setting + */ + public Resource getMaximumAllocationPerQueue(String queue); + + public boolean getEnableUserMetrics(); + + public int getNodeLocalityDelay(); + + public ResourceCalculator getResourceCalculator(); + + public void setResourceCalculator( + Class resourceCalculatorClass); + + public boolean getScheduleAynschronously(); + + public void setScheduleAynschronously(boolean async); + + public boolean getOverrideWithQueueMappings(); + + /** + * Get user/group mappings to queues. + * + * @return user/groups mappings or null on illegal configs + */ + public List getQueueMappings(); + + public boolean isReservable(String queue); + + public void setReservable(String queue, boolean isReservable); + + public long getReservationWindow(String queue); + + public float getAverageCapacity(String queue); + + public float getInstantaneousMaxCapacity(String queue); + + public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity); + + public void setReservationWindow(String queue, long reservationWindow); + + public void setAverageCapacity(String queue, float avgCapacity); + + public String getReservationAdmissionPolicy(String queue); + + public void setReservationAdmissionPolicy(String queue, + String reservationPolicy); + + public String getReservationAgent(String queue); + + public void setReservationAgent(String queue, String reservationPolicy); + + public boolean getShowReservationAsQueues(String queuePath); + + public String getReplanner(String queue); + + public boolean getMoveOnExpiry(String queue); + + public long getEnforcementWindow(String queue); + + /** + * Sets the disable_preemption property in order to indicate + * whether or not container preemption will be disabled for the specified + * queue. + * + * @param queue queue path + * @param preemptionDisabled true if preemption is disabled on queue + */ + public void setPreemptionDisabled(String queue, boolean preemptionDisabled); + + /** + * Indicates whether preemption is disabled on the specified queue. + * + * @param queue queue path to query + * @param defaultVal used as default if the disable_preemption + * is not set in the configuration + * @return true if preemption is disabled on queue, false otherwise + */ + public boolean getPreemptionDisabled(String queue, boolean defaultVal); + + public long getAsyncSchedulerIntervalMs(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSConfUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSConfUtils.java new file mode 100644 index 0000000..aa341ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSConfUtils.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.StringTokenizer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.ImmutableSet; + +public class CSConfUtils { + private static final Log LOG = + LogFactory.getLog(CSConfUtils.class); + + public static Set getAccessibleNodeLabels(String queue, + String accessibleLabelStr) { + // When accessible-label is null, + if (accessibleLabelStr == null) { + // Only return null when queue is not ROOT + if (!queue.equals(AbstractCSConfiguration.ROOT)) { + return null; + } + } else { + // print a warning when accessibleNodeLabel specified in config and queue + // is ROOT + if (queue.equals(AbstractCSConfiguration.ROOT)) { + LOG.warn("Accessible node labels for root queue will be ignored," + + " it will be automatically set to \"*\"."); + } + } + + // always return ANY for queue root + if (queue.equals(AbstractCSConfiguration.ROOT)) { + return ImmutableSet.of(RMNodeLabelsManager.ANY); + } + + // In other cases, split the accessibleLabelStr by "," + Set set = new HashSet(); + for (String str : accessibleLabelStr.split(",")) { + if (!str.trim().isEmpty()) { + set.add(str.trim()); + } + } + + // if labels contains "*", only keep ANY behind + if (set.contains(RMNodeLabelsManager.ANY)) { + set.clear(); + set.add(RMNodeLabelsManager.ANY); + } + return Collections.unmodifiableSet(set); + } + + public static float getAndCheckLabeledQueueCapacity(String queue, + String label, float capacity, float defaultValue) { + if (capacity < AbstractCSConfiguration.MINIMUM_CAPACITY_VALUE + || capacity > AbstractCSConfiguration.MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal capacity of " + capacity + + " for node-label=" + label + " in queue=" + queue + + ", valid capacity should in range of [0, 100]."); + } + if (LOG.isDebugEnabled()) { + LOG.debug("CSConf - getCapacityOfLabel: prefix=" + queue + ", label=" + + label + ", capacity=" + capacity); + } + return capacity; + } + + public static String getAclKey(QueueACL acl) { + return "acl_" + acl.toString().toLowerCase(Locale.ENGLISH); + } + + public static Resource getMaximumAllocationPerQueue(String queue, + int maxAllocationMbPerQueue, int maxAllocationVcoresPerQueue, + Resource clusterMaxAllocation) { + if (LOG.isDebugEnabled()) { + LOG.debug("max alloc mb per queue for " + queue + " is " + + maxAllocationMbPerQueue); + LOG.debug("max alloc vcores per queue for " + queue + " is " + + maxAllocationVcoresPerQueue); + } + if (maxAllocationMbPerQueue == (int) AbstractCSConfiguration.UNDEFINED) { + LOG.info("max alloc mb per queue for " + queue + " is undefined"); + maxAllocationMbPerQueue = clusterMaxAllocation.getMemory(); + } + if (maxAllocationVcoresPerQueue == (int) AbstractCSConfiguration.UNDEFINED) { + LOG.info("max alloc vcore per queue for " + queue + " is undefined"); + maxAllocationVcoresPerQueue = clusterMaxAllocation.getVirtualCores(); + } + Resource result = + Resources.createResource(maxAllocationMbPerQueue, + maxAllocationVcoresPerQueue); + if (maxAllocationMbPerQueue > clusterMaxAllocation.getMemory() + || maxAllocationVcoresPerQueue > clusterMaxAllocation.getVirtualCores()) { + throw new IllegalArgumentException( + "Queue maximum allocation cannot be larger than the cluster setting" + + " for queue " + queue + " max allocation per queue: " + result + + " cluster setting: " + clusterMaxAllocation); + } + return result; + } + + /** + * Returns a collection of strings, trimming leading and trailing whitespeace + * on each value + * + * @param str + * String to parse + * @param delim + * delimiter to separate the values + * @return Collection of parsed elements. + */ + private static Collection getTrimmedStringCollection(String str, + String delim) { + List values = new ArrayList(); + if (str == null) + return values; + StringTokenizer tokenizer = new StringTokenizer(str, delim); + while (tokenizer.hasMoreTokens()) { + String next = tokenizer.nextToken(); + if (next == null || next.trim().isEmpty()) { + continue; + } + values.add(next.trim()); + } + return values; + } + + /** + * Get user/group mappings to queues. + * + * @return user/groups mappings or null on illegal configs + */ + public static List getQueueMappings(String mappingFromConf) { + List mappings = + new ArrayList(); + Collection mappingsString = + getTrimmedStringCollection(mappingFromConf, ","); + for (String mappingValue : mappingsString) { + String[] mapping = + getTrimmedStringCollection(mappingValue, ":") + .toArray(new String[] {}); + if (mapping.length != 3 || mapping[1].length() == 0 + || mapping[2].length() == 0) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + QueueMapping m; + try { + QueueMapping.MappingType mappingType; + if (mapping[0].equals("u")) { + mappingType = QueueMapping.MappingType.USER; + } else if (mapping[0].equals("g")) { + mappingType = QueueMapping.MappingType.GROUP; + } else { + throw new IllegalArgumentException( + "unknown mapping prefix " + mapping[0]); + } + m = new QueueMapping( + mappingType, + mapping[1], + mapping[2]); + } catch (Throwable t) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + if (m != null) { + mappings.add(m); + } + } + + return mappings; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSHierarchyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSHierarchyConfiguration.java new file mode 100644 index 0000000..bbe52a4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSHierarchyConfiguration.java @@ -0,0 +1,538 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.config.BaseSchedulerHierarchyConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.config.SchedulerConfNode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +public class CSHierarchyConfiguration extends + BaseSchedulerHierarchyConfiguration implements + AbstractCSConfiguration { + private static final String NODE_LABELS_TAG = "node-labels"; + private static final String NODE_LABEL_TAG = "node-label"; + + private static final Log LOG = + LogFactory.getLog(CSHierarchyConfiguration.class); + + public CSHierarchyConfiguration(String filename) { + super(filename); + } + + @VisibleForTesting + public CSHierarchyConfiguration(SchedulerConfNode root) { + super(root); + } + + @Override + public int getMaximumSystemApplications() { + return getGlobalConfig(MAXIMUM_APPLICATIONS, + DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS, false); + } + + + @Override + public int getMaximumApplicationsPerQueue(String queue) { + return getQueueConfig(queue, MAXIMUM_APPLICATIONS, (int)UNDEFINED, false); + } + + @Override + public float getMaximumApplicationMasterResourcePercent() { + return getGlobalConfig(MAXIMUM_AM_RESOURCE, + DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, false); + } + + @Override + public float getMaximumApplicationMasterResourcePerQueuePercent(String queue) { + return getQueueConfig(queue, MAXIMUM_AM_RESOURCE, + getMaximumApplicationMasterResourcePercent(), false); + } + + @Override + public float getNonLabeledQueueCapacity(String queue) { + float capacity = queue.equals("root") ? 100.0f : + getQueueConfig(queue, CAPACITY, UNDEFINED, true); + if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal " + + "capacity of " + capacity + " for queue " + queue); + } + LOG.debug("CSConf - getCapacity: queuePrefix=" + queue + + ", capacity=" + capacity); + return capacity; + } + + @Override + public void setCapacity(String queue, float capacity) { + if (queue.equals("root")) { + throw new IllegalArgumentException( + "Cannot set capacity, root queue has a fixed capacity of 100.0f"); + } + SchedulerConfNode node = getQueueNode(queue); + node.setValue(CAPACITY, String.valueOf(capacity), true); + LOG.debug("CSConf - setCapacity: queuePrefix=" + queue + + ", capacity=" + capacity); + } + + @Override + public float getNonLabeledQueueMaximumCapacity(String queue) { + float maxCapacity = + getQueueConfig(queue, MAXIMUM_CAPACITY, MAXIMUM_CAPACITY_VALUE, true); + maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? + MAXIMUM_CAPACITY_VALUE : maxCapacity; + return maxCapacity; + } + + @Override + public void setMaximumCapacity(String queue, float maxCapacity) { + if (maxCapacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal " + "maximum-capacity of " + + maxCapacity + " for queue " + queue); + } + SchedulerConfNode node = getQueueNode(queue); + node.setValue(MAXIMUM_CAPACITY, String.valueOf(maxCapacity), true); + LOG.debug("CSConf - setMaxCapacity: queuePrefix=" + queue + + ", maxCapacity=" + maxCapacity); + } + + /** + * Get queue label node, will throw exception when queue node doesn't exist, + * but will return null when label node doesn't exist + */ + private SchedulerConfNode getQueueLabelNode(String queue, String label) { + SchedulerConfNode queueNode = getQueueNode(queue); + SchedulerConfNode policyPropertiesNode = + queueNode.getChild(SchedulerConfNode.POLICY_PROPERTIES_TAGNAME); + if (null == policyPropertiesNode) { + return null; + } + SchedulerConfNode labelsNode = policyPropertiesNode.getChild(NODE_LABELS_TAG); + if (null == labelsNode) { + return null; + } + SchedulerConfNode labelNode = labelsNode.getChild(NODE_LABEL_TAG, label); + return labelNode; + } + + private SchedulerConfNode getAndCreateQueueLabelNodeIfNotExisted( + String queue, String label) { + SchedulerConfNode queueNode = getQueueNode(queue); + + // Add policy-properties node + SchedulerConfNode policyPropertiesNode = + queueNode.getChild(SchedulerConfNode.POLICY_PROPERTIES_TAGNAME); + if (null == policyPropertiesNode) { + policyPropertiesNode = + queueNode.addChild(SchedulerConfNode.POLICY_PROPERTIES_TAGNAME, ""); + } + + // Add labels node + SchedulerConfNode labelsNode = policyPropertiesNode.getChild(NODE_LABELS_TAG); + if (null == labelsNode) { + labelsNode = policyPropertiesNode.addChild(NODE_LABELS_TAG, ""); + } + + // Add label node + SchedulerConfNode labelNode = labelsNode.getChild(NODE_LABEL_TAG, label); + if (null == labelNode) { + labelNode = labelsNode.addChild(NODE_LABEL_TAG, label); + } + + return labelNode; + } + + @Override + public void setCapacityByLabel(String queue, String label, float capacity) { + SchedulerConfNode labelNode = getAndCreateQueueLabelNodeIfNotExisted(queue, label); + labelNode.setValue(CAPACITY, String.valueOf(capacity), false); + } + + @Override + public void setMaximumCapacityByLabel(String queue, String label, + float capacity) { + SchedulerConfNode labelNode = getAndCreateQueueLabelNodeIfNotExisted(queue, label); + labelNode.setValue(MAXIMUM_CAPACITY, String.valueOf(capacity), false); + } + + @Override + public int getUserLimit(String queue) { + return getQueueConfig(queue, USER_LIMIT, DEFAULT_USER_LIMIT, true); + } + + @Override + public void setUserLimit(String queue, int userLimit) { + SchedulerConfNode queueNode = getQueueNode(queue); + queueNode.setValue(USER_LIMIT, String.valueOf(userLimit), true); + } + + @Override + public float getUserLimitFactor(String queue) { + return getQueueConfig(queue, USER_LIMIT_FACTOR, DEFAULT_USER_LIMIT_FACTOR, true); + } + + @Override + public void setUserLimitFactor(String queue, float userLimitFactor) { + getQueueNode(queue).setValue(USER_LIMIT_FACTOR, + String.valueOf(userLimitFactor), true); + } + + @Override + public QueueState getState(String queue) { + String state = getQueueConfig(queue, STATE, QueueState.RUNNING.name(), false); + return QueueState.valueOf(state); + } + + @Override + public void setAccessibleNodeLabels(String queue, Set labels) { + if (labels == null) { + return; + } + String str = StringUtils.join(",", labels); + getQueueNode(queue).setValue(ACCESSIBLE_NODE_LABELS, str, false); + } + + @Override + public Set getAccessibleNodeLabels(String queue) { + SchedulerConfNode queueNode = getQueueNode(queue); + String accessibleNodeLabelsStr = + queueNode.getValue(ACCESSIBLE_NODE_LABELS, null, false); + return CSConfUtils.getAccessibleNodeLabels(queue, accessibleNodeLabelsStr); + } + + private float internalGetLabeledQueueCapacity(String queue, String label, + String suffix, float defaultValue) { + SchedulerConfNode labelNode = getQueueLabelNode(queue, label); + float capacity = + Float.parseFloat(labelNode.getValue(suffix, String.valueOf(0))); + return CSConfUtils.getAndCheckLabeledQueueCapacity(queue, label, capacity, + defaultValue); + } + + @Override + public float getLabeledQueueCapacity(String queue, String label) { + return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f); + } + + @Override + public float getLabeledQueueMaximumCapacity(String queue, String label) { + return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f); + } + + @Override + public String getDefaultNodeLabelExpression(String queue) { + return getQueueConfig(queue, DEFAULT_NODE_LABEL_EXPRESSION, null, true); + } + + @Override + public void setDefaultNodeLabelExpression(String queue, String exp) { + getQueueNode(queue).setValue(DEFAULT_NODE_LABEL_EXPRESSION, exp, true); + } + + @Override + public boolean getReservationContinueLook() { + return getGlobalConfig(RESERVE_CONT_LOOK_ALL_NODES_SUFFIX, + DEFAULT_RESERVE_CONT_LOOK_ALL_NODES, true); + } + + @Override + public AccessControlList getAcl(String queue, QueueACL acl) { + // The root queue defaults to all access if not defined + // Sub queues inherit access if not defined + String defaultAcl = queue.equals(ROOT) ? ALL_ACL : NONE_ACL; + String aclString = + getQueueConfig(queue, CSConfUtils.getAclKey(acl), defaultAcl, false); + return new AccessControlList(aclString); + } + + @Override + public void setAcl(String queue, QueueACL acl, String aclString) { + getQueueNode(queue).setValue(CSConfUtils.getAclKey(acl), aclString, false); + } + + @Override + public Map getAcls(String queue) { + Map acls = + new HashMap(); + for (QueueACL acl : QueueACL.values()) { + acls.put(SchedulerUtils.toAccessType(acl), getAcl(queue, acl)); + } + return acls; + } + + @Override + public void setAcls(String queue, Map acls) { + for (Map.Entry e : acls.entrySet()) { + setAcl(queue, e.getKey(), e.getValue().getAclString()); + } + } + + @Override + public String[] getQueues(String queue) { + return getSubQueueNames(queue).toArray(new String[0]); + } + + @Override + public void setQueues(String queue, String[] subQueues) { + addQueues(queue, subQueues); + } + + @Override + public Resource getMinimumAllocation() { + int minimumMemory = getGlobalConfig( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, false); + int minimumCores = getGlobalConfig( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, false); + return Resources.createResource(minimumMemory, minimumCores); + } + + @Override + public Resource getMaximumAllocation() { + int maximumMemory = getGlobalConfig( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, false); + int maximumCores = getGlobalConfig( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, false); + return Resources.createResource(maximumMemory, maximumCores); + } + + @Override + public Resource getMaximumAllocationPerQueue(String queue) { + int maxAllocationMbPerQueue = + getQueueConfig(queue, MAXIMUM_ALLOCATION_MB, (int) UNDEFINED, false); + int maxAllocationVcoresPerQueue = + getQueueConfig(queue, MAXIMUM_ALLOCATION_VCORES, (int) UNDEFINED, false); + Resource clusterMax = getMaximumAllocation(); + return CSConfUtils.getMaximumAllocationPerQueue(queue, + maxAllocationMbPerQueue, maxAllocationVcoresPerQueue, clusterMax); + } + + @Override + public boolean getEnableUserMetrics() { + return getGlobalConfig(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS, false); + } + + @Override + public int getNodeLocalityDelay() { + int delay = + getGlobalConfig(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY, true); + return (delay == DEFAULT_NODE_LOCALITY_DELAY) ? 0 : delay; + } + + @Override + public ResourceCalculator getResourceCalculator() { + // Since there's bunch of class-related utilities in Configuration, here + // will reuse Hadoop Configuration + String className = + getGlobalConfig(RESOURCE_CALCULATOR_CLASS, + DEFAULT_RESOURCE_CALCULATOR_CLASS.getCanonicalName(), true); + + Configuration tmpConf = new Configuration(false); + tmpConf.set(RESOURCE_CALCULATOR_CLASS, className); + return ReflectionUtils.newInstance( + tmpConf.getClass( + RESOURCE_CALCULATOR_CLASS, + DEFAULT_RESOURCE_CALCULATOR_CLASS, + ResourceCalculator.class), + tmpConf); + } + + @Override + public void setResourceCalculator( + Class resourceCalculatorClass) { + // Since there's bunch of class-related utilities in Configuration, here + // will reuse Hadoop Configuration + Configuration tmpConf = new Configuration(false); + tmpConf.setClass(RESOURCE_CALCULATOR_CLASS, resourceCalculatorClass, + ResourceCalculator.class); + setGlobalConfig(RESOURCE_CALCULATOR_CLASS, + resourceCalculatorClass.getCanonicalName(), true); + } + + @Override + public boolean getScheduleAynschronously() { + return getGlobalConfig(SCHEDULE_ASYNCHRONOUSLY_ENABLE, + DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + } + + @Override + public void setScheduleAynschronously(boolean async) { + setGlobalConfig(SCHEDULE_ASYNCHRONOUSLY_ENABLE, + String.valueOf(DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE), false); + } + + @Override + public boolean getOverrideWithQueueMappings() { + return getGlobalConfig(ENABLE_QUEUE_MAPPING_OVERRIDE, + DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE, true); + } + + @Override + public List getQueueMappings() { + return CSConfUtils.getQueueMappings(getGlobalConfig(QUEUE_MAPPING, null, true)); + } + + @Override + public boolean isReservable(String queue) { + return getQueueConfig(queue, IS_RESERVABLE, false, true); + } + + @Override + public void setReservable(String queue, boolean isReservable) { + SchedulerConfNode queueNode = getQueueNode(queue); + queueNode.setValue(IS_RESERVABLE, String.valueOf(isReservable), true); + } + + @Override + public long getReservationWindow(String queue) { + return getQueueConfig(queue, RESERVATION_WINDOW, + ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW, true); + } + + @Override + public float getAverageCapacity(String queue) { + return getQueueConfig(queue, AVERAGE_CAPACITY, MAXIMUM_CAPACITY_VALUE, true); + } + + @Override + public float getInstantaneousMaxCapacity(String queue) { + return getQueueConfig(queue, INSTANTANEOUS_MAX_CAPACITY, + MAXIMUM_CAPACITY_VALUE, true); + } + + @Override + public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) { + SchedulerConfNode queueNode = getQueueNode(queue); + queueNode.setValue(INSTANTANEOUS_MAX_CAPACITY, String.valueOf(instMaxCapacity), true); + } + + @Override + public void setReservationWindow(String queue, long reservationWindow) { + SchedulerConfNode queueNode = getQueueNode(queue); + queueNode.setValue(RESERVATION_WINDOW, String + .valueOf(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW), true); + } + + @Override + public void setAverageCapacity(String queue, float avgCapacity) { + SchedulerConfNode queueNode = getQueueNode(queue); + queueNode.setValue(AVERAGE_CAPACITY, String.valueOf(MAXIMUM_CAPACITY_VALUE), true); + } + + @Override + public String getReservationAdmissionPolicy(String queue) { + return getQueueConfig(queue, RESERVATION_ADMISSION_POLICY, + DEFAULT_RESERVATION_ADMISSION_POLICY, true); + } + + @Override + public void setReservationAdmissionPolicy(String queue, + String reservationPolicy) { + getQueueNode(queue).setValue(RESERVATION_ADMISSION_POLICY, + DEFAULT_RESERVATION_ADMISSION_POLICY, true); + } + + @Override + public String getReservationAgent(String queue) { + return getQueueConfig(queue, RESERVATION_ADMISSION_POLICY, + DEFAULT_RESERVATION_ADMISSION_POLICY, true); + } + + @Override + public void setReservationAgent(String queue, String reservationPolicy) { + getQueueNode(queue).setValue(RESERVATION_AGENT_NAME, + reservationPolicy, true); + } + + @Override + public boolean getShowReservationAsQueues(String queuePath) { + return getQueueConfig(queuePath, RESERVATION_SHOW_RESERVATION_AS_QUEUE, + ReservationSchedulerConfiguration.DEFAULT_SHOW_RESERVATIONS_AS_QUEUES, true); + } + + @Override + public String getReplanner(String queue) { + return getQueueConfig(queue, RESERVATION_PLANNER_NAME, + DEFAULT_RESERVATION_PLANNER_NAME, true); + } + + @Override + public boolean getMoveOnExpiry(String queue) { + return getQueueConfig(queue, RESERVATION_MOVE_ON_EXPIRY, + ReservationSchedulerConfiguration.DEFAULT_RESERVATION_MOVE_ON_EXPIRY, true); + } + + @Override + public long getEnforcementWindow(String queue) { + return getQueueConfig( + queue, + RESERVATION_ENFORCEMENT_WINDOW, + ReservationSchedulerConfiguration.DEFAULT_RESERVATION_ENFORCEMENT_WINDOW, true); + } + + @Override + public void setPreemptionDisabled(String queue, boolean preemptionDisabled) { + getQueueNode(queue).setValue(QUEUE_PREEMPTION_DISABLED, + String.valueOf(preemptionDisabled), false); + } + + @Override + public boolean getPreemptionDisabled(String queue, boolean defaultVal) { + return getQueueConfig(queue, QUEUE_PREEMPTION_DISABLED, defaultVal, false); + } + + @Override + public long getAsyncSchedulerIntervalMs() { + return getGlobalConfig(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL, false); + } + + @Override + public void setMaximumApplicationsPerQueue(String queue, int value) { + getQueueNode(queue).setValue(MAXIMUM_APPLICATIONS, String.valueOf(value), false); + } + + @Override + public void setMaximumApplicationMasterResourcePerQueuePercent(String queue, + float value) { + getQueueNode(queue).setValue(MAXIMUM_AM_RESOURCE, String.valueOf(value), false); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6b9d846..8dd0e6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,6 +25,7 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -33,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.HashSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -84,12 +85,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -112,11 +115,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; - @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -294,7 +292,10 @@ private synchronized void initScheduler(Configuration configuration) throws this.minimumAllocation = this.conf.getMinimumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation()); this.calculator = this.conf.getResourceCalculator(); - this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.usePortForNodeName = + conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); this.applications = new ConcurrentHashMap>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 5e6d3eb..3189adf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -19,15 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,16 +38,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.collect.ImmutableSet; - -public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { +public class CapacitySchedulerConfiguration extends Configuration + implements AbstractCSConfiguration, ReservationSchedulerConfiguration { private static final Log LOG = LogFactory.getLog(CapacitySchedulerConfiguration.class); @@ -99,9 +93,6 @@ public static final String STATE = "state"; @Private - public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels"; - - @Private public static final String DEFAULT_NODE_LABEL_EXPRESSION = "default-node-label-expression"; @@ -192,35 +183,6 @@ @Private public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; - - @Private - public static class QueueMapping { - - public enum MappingType { - - USER("u"), - GROUP("g"); - private final String type; - private MappingType(String type) { - this.type = type; - } - - public String toString() { - return type; - } - - }; - - MappingType type; - String source; - String queue; - - public QueueMapping(MappingType type, String source, String queue) { - this.type = type; - this.source = source; - this.queue = queue; - } - } @Private public static final String AVERAGE_CAPACITY = "average-capacity"; @@ -256,6 +218,11 @@ public QueueMapping(MappingType type, String source, String queue) { @Private public static final String RESERVATION_ENFORCEMENT_WINDOW = "reservation-enforcement-window"; + + @Private + public static final String ASYNC_SCHEDULER_INTERVAL = + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms"; public CapacitySchedulerConfiguration() { this(new Configuration()); @@ -394,9 +361,8 @@ public void setUserLimitFactor(String queue, float userLimitFactor) { public QueueState getState(String queue) { String state = get(getQueuePrefix(queue) + STATE); - return (state != null) ? - QueueState.valueOf(state.toUpperCase(Locale.ENGLISH)) : - QueueState.RUNNING; + return (state != null) ? QueueState.valueOf(state + .toUpperCase(Locale.ENGLISH)) : QueueState.RUNNING; } public void setAccessibleNodeLabels(String queue, Set labels) { @@ -411,57 +377,15 @@ public void setAccessibleNodeLabels(String queue, Set labels) { String accessibleLabelStr = get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS); - // When accessible-label is null, - if (accessibleLabelStr == null) { - // Only return null when queue is not ROOT - if (!queue.equals(ROOT)) { - return null; - } - } else { - // print a warning when accessibleNodeLabel specified in config and queue - // is ROOT - if (queue.equals(ROOT)) { - LOG.warn("Accessible node labels for root queue will be ignored," - + " it will be automatically set to \"*\"."); - } - } - - // always return ANY for queue root - if (queue.equals(ROOT)) { - return ImmutableSet.of(RMNodeLabelsManager.ANY); - } - - // In other cases, split the accessibleLabelStr by "," - Set set = new HashSet(); - for (String str : accessibleLabelStr.split(",")) { - if (!str.trim().isEmpty()) { - set.add(str.trim()); - } - } - - // if labels contains "*", only keep ANY behind - if (set.contains(RMNodeLabelsManager.ANY)) { - set.clear(); - set.add(RMNodeLabelsManager.ANY); - } - return Collections.unmodifiableSet(set); + return CSConfUtils.getAccessibleNodeLabels(queue, accessibleLabelStr); } - private float internalGetLabeledQueueCapacity(String queue, String label, String suffix, - float defaultValue) { + private float internalGetLabeledQueueCapacity(String queue, String label, + String suffix, float defaultValue) { String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; float capacity = getFloat(capacityPropertyName, defaultValue); - if (capacity < MINIMUM_CAPACITY_VALUE - || capacity > MAXIMUM_CAPACITY_VALUE) { - throw new IllegalArgumentException("Illegal capacity of " + capacity - + " for node-label=" + label + " in queue=" + queue - + ", valid capacity should in range of [0, 100]."); - } - if (LOG.isDebugEnabled()) { - LOG.debug("CSConf - getCapacityOfLabel: prefix=" - + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); - } - return capacity; + return CSConfUtils.getAndCheckLabeledQueueCapacity(queue, label, capacity, + defaultValue); } public float getLabeledQueueCapacity(String queue, String label) { @@ -490,23 +414,19 @@ public boolean getReservationContinueLook() { return getBoolean(RESERVE_CONT_LOOK_ALL_NODES, DEFAULT_RESERVE_CONT_LOOK_ALL_NODES); } - - private static String getAclKey(QueueACL acl) { - return "acl_" + acl.toString().toLowerCase(Locale.ENGLISH); - } public AccessControlList getAcl(String queue, QueueACL acl) { String queuePrefix = getQueuePrefix(queue); // The root queue defaults to all access if not defined // Sub queues inherit access if not defined String defaultAcl = queue.equals(ROOT) ? ALL_ACL : NONE_ACL; - String aclString = get(queuePrefix + getAclKey(acl), defaultAcl); + String aclString = get(queuePrefix + CSConfUtils.getAclKey(acl), defaultAcl); return new AccessControlList(aclString); } public void setAcl(String queue, QueueACL acl, String aclString) { String queuePrefix = getQueuePrefix(queue); - set(queuePrefix + getAclKey(acl), aclString); + set(queuePrefix + CSConfUtils.getAclKey(acl), aclString); } public Map getAcls(String queue) { @@ -576,36 +496,13 @@ public Resource getMaximumAllocation() { */ public Resource getMaximumAllocationPerQueue(String queue) { String queuePrefix = getQueuePrefix(queue); - int maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, - (int)UNDEFINED); - int maxAllocationVcoresPerQueue = getInt( - queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED); - if (LOG.isDebugEnabled()) { - LOG.debug("max alloc mb per queue for " + queue + " is " - + maxAllocationMbPerQueue); - LOG.debug("max alloc vcores per queue for " + queue + " is " - + maxAllocationVcoresPerQueue); - } + int maxAllocationMbPerQueue = + getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int) UNDEFINED); + int maxAllocationVcoresPerQueue = + getInt(queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int) UNDEFINED); Resource clusterMax = getMaximumAllocation(); - if (maxAllocationMbPerQueue == (int)UNDEFINED) { - LOG.info("max alloc mb per queue for " + queue + " is undefined"); - maxAllocationMbPerQueue = clusterMax.getMemory(); - } - if (maxAllocationVcoresPerQueue == (int)UNDEFINED) { - LOG.info("max alloc vcore per queue for " + queue + " is undefined"); - maxAllocationVcoresPerQueue = clusterMax.getVirtualCores(); - } - Resource result = Resources.createResource(maxAllocationMbPerQueue, - maxAllocationVcoresPerQueue); - if (maxAllocationMbPerQueue > clusterMax.getMemory() - || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) { - throw new IllegalArgumentException( - "Queue maximum allocation cannot be larger than the cluster setting" - + " for queue " + queue - + " max allocation per queue: " + result - + " cluster setting: " + clusterMax); - } - return result; + return CSConfUtils.getMaximumAllocationPerQueue(queue, + maxAllocationMbPerQueue, maxAllocationVcoresPerQueue, clusterMax); } public boolean getEnableUserMetrics() { @@ -626,12 +523,7 @@ public ResourceCalculator getResourceCalculator() { this); } - public boolean getUsePortForNodeName() { - return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, - YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); - } - - public void setResourceComparator( + public void setResourceCalculator( Class resourceCalculatorClass) { setClass( RESOURCE_CALCULATOR_CLASS, @@ -652,32 +544,8 @@ public boolean getOverrideWithQueueMappings() { return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); } + - /** - * Returns a collection of strings, trimming leading and trailing whitespeace - * on each value - * - * @param str - * String to parse - * @param delim - * delimiter to separate the values - * @return Collection of parsed elements. - */ - private static Collection getTrimmedStringCollection(String str, - String delim) { - List values = new ArrayList(); - if (str == null) - return values; - StringTokenizer tokenizer = new StringTokenizer(str, delim); - while (tokenizer.hasMoreTokens()) { - String next = tokenizer.nextToken(); - if (next == null || next.trim().isEmpty()) { - continue; - } - values.add(next.trim()); - } - return values; - } /** * Get user/group mappings to queues. @@ -685,46 +553,7 @@ public boolean getOverrideWithQueueMappings() { * @return user/groups mappings or null on illegal configs */ public List getQueueMappings() { - List mappings = - new ArrayList(); - Collection mappingsString = - getTrimmedStringCollection(QUEUE_MAPPING); - for (String mappingValue : mappingsString) { - String[] mapping = - getTrimmedStringCollection(mappingValue, ":") - .toArray(new String[] {}); - if (mapping.length != 3 || mapping[1].length() == 0 - || mapping[2].length() == 0) { - throw new IllegalArgumentException( - "Illegal queue mapping " + mappingValue); - } - - QueueMapping m; - try { - QueueMapping.MappingType mappingType; - if (mapping[0].equals("u")) { - mappingType = QueueMapping.MappingType.USER; - } else if (mapping[0].equals("g")) { - mappingType = QueueMapping.MappingType.GROUP; - } else { - throw new IllegalArgumentException( - "unknown mapping prefix " + mapping[0]); - } - m = new QueueMapping( - mappingType, - mapping[1], - mapping[2]); - } catch (Throwable t) { - throw new IllegalArgumentException( - "Illegal queue mapping " + mappingValue); - } - - if (m != null) { - mappings.add(m); - } - } - - return mappings; + return CSConfUtils.getQueueMappings(get(QUEUE_MAPPING)); } public boolean isReservable(String queue) { @@ -743,7 +572,7 @@ public void setReservable(String queue, boolean isReservable) { public long getReservationWindow(String queue) { long reservationWindow = getLong(getQueuePrefix(queue) + RESERVATION_WINDOW, - DEFAULT_RESERVATION_WINDOW); + ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW); return reservationWindow; } @@ -806,7 +635,7 @@ public boolean getShowReservationAsQueues(String queuePath) { boolean showReservationAsQueues = getBoolean(getQueuePrefix(queuePath) + RESERVATION_SHOW_RESERVATION_AS_QUEUE, - DEFAULT_SHOW_RESERVATIONS_AS_QUEUES); + ReservationSchedulerConfiguration.DEFAULT_SHOW_RESERVATIONS_AS_QUEUES); return showReservationAsQueues; } @@ -822,7 +651,7 @@ public String getReplanner(String queue) { public boolean getMoveOnExpiry(String queue) { boolean killOnExpiry = getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY, - DEFAULT_RESERVATION_MOVE_ON_EXPIRY); + ReservationSchedulerConfiguration.DEFAULT_RESERVATION_MOVE_ON_EXPIRY); return killOnExpiry; } @@ -830,7 +659,7 @@ public boolean getMoveOnExpiry(String queue) { public long getEnforcementWindow(String queue) { long enforcementWindow = getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW, - DEFAULT_RESERVATION_ENFORCEMENT_WINDOW); + ReservationSchedulerConfiguration.DEFAULT_RESERVATION_ENFORCEMENT_WINDOW); return enforcementWindow; } @@ -861,4 +690,22 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { defaultVal); return preemptionDisabled; } -} + + @Override + public long getAsyncSchedulerIntervalMs() { + return getLong(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL); + } + + @Override + public void setMaximumApplicationsPerQueue(String queue, int value) { + setInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS, value); + } + + @Override + public void setMaximumApplicationMasterResourcePerQueuePercent(String queue, + float value) { + setFloat(getQueuePrefix(queue) + + MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, value); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueMapping.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueMapping.java new file mode 100644 index 0000000..a403903 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueMapping.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +public class QueueMapping { + + public enum MappingType { + + USER("u"), + GROUP("g"); + private final String type; + private MappingType(String type) { + this.type = type; + } + + public String toString() { + return type; + } + + }; + + MappingType type; + String source; + String queue; + + public QueueMapping(MappingType type, String source, String queue) { + this.type = type; + this.source = source; + this.queue = queue; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/BaseSchedulerHierarchyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/BaseSchedulerHierarchyConfiguration.java new file mode 100644 index 0000000..2a61573 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/BaseSchedulerHierarchyConfiguration.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.config; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URL; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Sets; + +public class BaseSchedulerHierarchyConfiguration { + private static final Log LOG = + LogFactory.getLog(BaseSchedulerHierarchyConfiguration.class); + + private static final String QUEUES_TAG = "queues"; + private static final String QUEUE_TAG = "queue"; + private static final String ROOT_QUEUE_NAME = "root"; + private static final String DOT = "."; + + protected SchedulerConfNode root; + private Map pathToQueueNodes = + new HashMap(); + + private File getSchedulerConfFile(String schedulerConfFilePath) { + File allocFile = new File(schedulerConfFilePath); + if (!allocFile.isAbsolute()) { + URL url = Thread.currentThread().getContextClassLoader() + .getResource(schedulerConfFilePath); + if (url == null) { + LOG.warn(schedulerConfFilePath + " not found on the classpath."); + allocFile = null; + } else if (!url.getProtocol().equalsIgnoreCase("file")) { + throw new RuntimeException("Allocation file " + url + + " found on the classpath is not on the local filesystem."); + } else { + allocFile = new File(url.getPath()); + } + } + return allocFile; + } + + public BaseSchedulerHierarchyConfiguration(String filename) { + try { + FileInputStream is = new FileInputStream(getSchedulerConfFile(filename)); + this.root = SchedulerConfParser.parse(is); + is.close(); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + + init(); + } + + @VisibleForTesting + public BaseSchedulerHierarchyConfiguration(SchedulerConfNode root) { + this.root = root; + + init(); + } + + public int getGlobalConfig(String optionName, int defaultValue, boolean policyProperty) { + return Integer.parseInt(root.getValue(optionName, + String.valueOf(defaultValue), policyProperty)); + } + + public float getGlobalConfig(String optionName, float defaultValue, boolean policyProperty) { + return Float.parseFloat(root.getValue(optionName, + String.valueOf(defaultValue), policyProperty)); + } + + public String getGlobalConfig(String optionName, String defaultValue, boolean policyProperty) { + return root.getValue(optionName, defaultValue, policyProperty); + } + + public boolean getGlobalConfig(String optionName, boolean defaultValue, boolean policyProperty) { + return Boolean.parseBoolean(root.getValue(optionName, + String.valueOf(defaultValue), policyProperty)); + } + + public long getGlobalConfig(String optionName, long defaultValue, boolean policyProperty) { + return Long.parseLong(root.getValue(optionName, + String.valueOf(defaultValue), policyProperty)); + } + + public void setGlobalConfig(String optionName, String value, boolean policyProperty) { + root.setValue(optionName, value, policyProperty); + } + + protected void init() { + initQueues(); + } + + private void addQueues(SchedulerConfNode cur, String path) { + if (cur.getName().isEmpty()) { + throw new YarnRuntimeException("Queue's name couldn't be empty"); + } + + if (path.isEmpty()) { + path = cur.getName(); + } else { + path = path + DOT + cur.getName(); + } + pathToQueueNodes.put(path, cur); + + // add children + SchedulerConfNode queuesNode; + if (null != (queuesNode = cur.getChild(QUEUES_TAG))) { + List queues = queuesNode.getChildren(QUEUE_TAG); + // use a set to check if there are any duplicated queue names under the + // parent + Set names = new HashSet(); + for (SchedulerConfNode node : queues) { + if (!names.add(node.getName())) { + throw new YarnRuntimeException("duplicated queue has name=" + + node.getName() + " is already included by parent=" + + cur.getName()); + } + addQueues(node, path); + } + } + } + + private void initQueues() { + // Get root queue + SchedulerConfNode rootQueueNode = root.getChild(QUEUE_TAG); + if (null == rootQueueNode + || !rootQueueNode.getName().equals(ROOT_QUEUE_NAME)) { + throw new YarnRuntimeException("Failed to get root queue"); + } + + addQueues(rootQueueNode, ""); + } + + protected SchedulerConfNode getQueueNode(String queuePath) { + // Get queue conf node + SchedulerConfNode queueNode = pathToQueueNodes.get(queuePath); + if (null == queueNode) { + throw new YarnRuntimeException("Failed to get queue node with path=" + + queuePath); + } + return queueNode; + } + + public Collection getSubQueues(String queuePath) { + SchedulerConfNode queueNode = getQueueNode(queuePath); + SchedulerConfNode queuesNode = queueNode.getChild(QUEUES_TAG); + if (null == queuesNode) { + return Collections.emptyList(); + } + return queuesNode.getChildren(QUEUE_TAG); + } + + public Collection getSubQueueNames(String queuePath) { + Collection subQueueNames = + Collections2.transform(getSubQueues(queuePath), + new Function() { + @Override + public String apply(SchedulerConfNode input) { + return input.getName(); + } + }); + return subQueueNames; + } + + public void addQueues(String parentQueuePath, String[] children) { + SchedulerConfNode queueNode = getQueueNode(parentQueuePath); + SchedulerConfNode queuesNode = queueNode.getChild(QUEUES_TAG); + if (null == queuesNode) { + queueNode = queueNode.addChild(QUEUES_TAG, ""); + } + + // Check duplications + Set set = Sets.newHashSet(children); + if (set.size() < children.length || set.contains("")) { + throw new IllegalArgumentException( + "duplicated or empty named queue found in children"); + } + + for (String name : children) { + queueNode.addChild(QUEUE_TAG, name); + } + } + + public int getQueueConfig(String queuePath, String optionName, + int defaultValue, boolean policyProperty) { + return getQueueConfig(queuePath, optionName, defaultValue, policyProperty, + false); + } + + public int getQueueConfig(String queuePath, String optionName, + int defaultValue, boolean policyProperty, boolean inherit) { + SchedulerConfNode queueNode = getQueueNode(queuePath); + return Integer.parseInt(queueNode.getValue(optionName, + String.valueOf(defaultValue), policyProperty, inherit)); + } + + public boolean getQueueConfig(String queuePath, String optionName, + boolean defaultValue, boolean policyProperty) { + return Boolean.parseBoolean(getQueueConfig(queuePath, optionName, + String.valueOf(defaultValue), policyProperty, false)); + } + + public long getQueueConfig(String queuePath, String optionName, + long defaultValue, boolean policyProperty) { + return Long.parseLong(getQueueConfig(queuePath, optionName, + String.valueOf(defaultValue), policyProperty, false)); + } + + public float getQueueConfig(String queuePath, String optionName, + float defaultValue, boolean policyProperty) { + return getQueueConfig(queuePath, optionName, defaultValue, policyProperty, + false); + } + + public float getQueueConfig(String queuePath, String optionName, + float defaultValue, boolean policyProperty, boolean inherit) { + SchedulerConfNode queueNode = getQueueNode(queuePath); + return Float.parseFloat(queueNode.getValue(optionName, + String.valueOf(defaultValue), policyProperty, inherit)); + } + + public String getQueueConfig(String queuePath, String optionName, + String defaultValue, boolean policyProperty) { + return getQueueConfig(queuePath, optionName, defaultValue, policyProperty, + false); + } + + public String getQueueConfig(String queuePath, String optionName, + String defaultValue, boolean policyProperty, boolean inherit) { + SchedulerConfNode queueNode = getQueueNode(queuePath); + return queueNode.getValue(optionName, defaultValue, policyProperty, inherit); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/SchedulerConfNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/SchedulerConfNode.java new file mode 100644 index 0000000..0d21652 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/SchedulerConfNode.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.config; + +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +/** + * A "node" in scheduler's configuration file, it could be a queue, a user, etc. + * Generally, it should be a hierarchy + */ +public class SchedulerConfNode { + public static final String POLICY_PROPERTIES_TAGNAME = "policy-properties"; + + private SchedulerConfNode parent; + private ListMultimap fields; + + /** + * Following fields in this class should be read only + */ + String name; + String value; + String tag; + + public SchedulerConfNode(String name, String value, String tag, + ListMultimap fields) { + this.name = name; + if (null == value) { + this.value = ""; + } else { + this.value = value; + } + this.tag = tag; + if (null == fields) { + this.fields = ArrayListMultimap.create(); + } else { + this.fields = fields; + } + parent = null; + } + + public String getName() { + return name; + } + + public String getTag() { + return tag; + } + + public void setParent(SchedulerConfNode parent) { + this.parent = parent; + } + + public SchedulerConfNode getParent() { + return this.parent; + } + + public void setFields(ListMultimap fields) { + if (fields == null) { + this.fields = ArrayListMultimap.create(); + } else { + this.fields = fields; + } + } + + /** + * Get children with given tag name, return null if no children related to + * given tagName + */ + public List getChildren(String tagName) { + if (null == fields.get(tagName) || fields.get(tagName).isEmpty()) { + return null; + } + return Collections.unmodifiableList(fields.get(tagName)); + } + + /** + * Get child with given tagName, if multiple children existed, return the + * first one, return null if no children existed. + */ + public SchedulerConfNode getChild(String tagName) { + List list = getChildren(tagName); + if (list == null || list.isEmpty()) { + return null; + } + return list.get(0); + } + + /** + * Get child with given tagName, if multiple children existed, return the + * first one, return null if no children existed. + */ + public SchedulerConfNode getChild(String tagName, String name) { + List list = getChildren(tagName); + if (list == null || list.isEmpty()) { + return null; + } + + for (SchedulerConfNode n : list) { + if (n.getName().equals(name)) { + return n; + } + } + + return null; + } + + /** + * Return value of this element itself. + * + * value + * + * Call getValue on now will return "value" + */ + public String getValue() { + return value; + } + + public String getValue(String tagName) { + return getValue(tagName, null); + } + + public String getValue(String tagName, String defaultValue) { + return getValue(tagName, defaultValue, false); + } + + public String getValue(String tagName, String defaultValue, + boolean policyProperty) { + return getValue(tagName, defaultValue, policyProperty, false); + } + + /** + * Return value of direct child with given tagName, return empty string if no + * value; return null if no element with given tagName existed. + * + *
+   * {@code
+   * 
+   *  value1 
+   * 
+   * }
+   * 
+ * + * Call getValue("option1") will return value1. If no tag , + * defaultValue will be returned + * + * When inherit=true, it will try to find lowest ancestor with the option, + * such as: + * + *
+   * {@code
+   * 
+   *    value1
+   *    
+   *       value2
+   *       
+   *       
+   *    
+   * 
+   * }
+   * 
+ * + * Execute getValue("key", inherit=true) on "now" will returns value2 + */ + public String getValue(String tagName, String defaultValue, + boolean policyProperty, boolean inherit) { + if (!policyProperty) { + // First to getValue on this node, like + //
+      // {@code
+      // 
+      //     value
+      // 
+      // }
+      // 
+ SchedulerConfNode node = getChild(tagName); + if (null != node) { + return node.getValue(); + } + } else { + // If it's a policy property, we will get it from + //
+      // {@code
+      // 
+      //     
+      //         value
+      //     
+      // 
+      // }
+      // 
+ SchedulerConfNode policyPropertiesNode = getChild(POLICY_PROPERTIES_TAGNAME); + if (null != policyPropertiesNode && null != policyPropertiesNode.getChild(tagName)) { + return policyPropertiesNode.getChild(tagName).getValue(); + } + } + + if (inherit) { + // When inherit enabled: + if (null == parent) { + // We're root, try to get value directly + return getValue(tagName, defaultValue, policyProperty, false); + } else { + // We're not root, try to inherit value from parent + return parent.getValue(tagName, defaultValue, policyProperty, true); + } + } else { + return defaultValue; + } + } + + public void setValue(String value) { + this.value = value; + } + + public void setValue(String tagName, String value, boolean policyProperty) { + if (!policyProperty) { + SchedulerConfNode node = getChild(tagName); + if (null != node) { + node.setValue(value); + } else { + fields + .put(tagName, new SchedulerConfNode("", value, tagName, null)); + } + } else { + // trying to add policyProperties node + SchedulerConfNode policyPropertiesNode = + getChild(POLICY_PROPERTIES_TAGNAME); + if (null == policyPropertiesNode) { + policyPropertiesNode = + new SchedulerConfNode("", "", POLICY_PROPERTIES_TAGNAME, null); + fields.put(POLICY_PROPERTIES_TAGNAME, policyPropertiesNode); + } + policyPropertiesNode.setValue(tagName, value, false); + } + } + + public SchedulerConfNode addChild(String tagName, String name) { + SchedulerConfNode child = new SchedulerConfNode(name, "", tagName, null); + fields.put(tagName, child); + return child; + } + + public boolean isLeafNode() { + return fields.isEmpty(); + } + + @VisibleForTesting + public ListMultimap getFields() { + return fields; + } + + private int getDepth() { + int hierarchy = 0; + SchedulerConfNode p = parent; + while (null != p) { + hierarchy ++; + p = p.parent; + } + return hierarchy; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + String indent = StringUtils.repeat(" ", getDepth() * 4); + sb.append(indent + "tag=" + tag + ", name=" + name + ", value=" + value + + " children=[\n"); + for (String key : fields.keySet()) { + for (SchedulerConfNode node : fields.get(key)) { + sb.append(node.toString()); + } + } + return sb.toString(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/SchedulerConfParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/SchedulerConfParser.java new file mode 100644 index 0000000..2472f20 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/SchedulerConfParser.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.config; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; +import org.xml.sax.SAXException; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +/** + * Parse hierarchy scheduler configuration file + */ +public class SchedulerConfParser { + private static final String NAME = "name"; + + public static SchedulerConfNode parse(String content) throws YarnRuntimeException { + try { + Document doc = parseDocument(content); + return internalParse(doc.getDocumentElement()); + } catch (ParserConfigurationException | SAXException | IOException e) { + throw new YarnRuntimeException(e); + } + } + + public static SchedulerConfNode parse(InputStream is) throws YarnRuntimeException { + try { + Document doc = parseDocument(is); + return internalParse(doc.getDocumentElement()); + } catch (ParserConfigurationException | SAXException | IOException e) { + throw new YarnRuntimeException(e); + } + } + + public SchedulerConfNode parse(File file) throws YarnRuntimeException { + try { + Document doc = parseDocument(file); + return internalParse(doc.getDocumentElement()); + } catch (ParserConfigurationException | SAXException | IOException e) { + throw new YarnRuntimeException(e); + } + } + + private static SchedulerConfNode internalParse(Element current) { + // Get name + String name = current.getAttribute(NAME); + + // Get tag + String tag = current.getTagName(); + + // Get value + String value = ""; + + // Initial fields + ListMultimap fields = ArrayListMultimap.create(); + List childrenElements = new ArrayList(); + NodeList nodes = current.getChildNodes(); + + if (nodes.getLength() > 0) { + for (int i = 0; i < nodes.getLength(); i++) { + Node node = nodes.item(i); + if (!(node instanceof Element)) { + continue; + } + Element element = (Element) node; + childrenElements.add(element); + SchedulerConfNode childConfNode = internalParse(element); + fields.put(element.getTagName(), childConfNode); + } + } + + if (childrenElements.isEmpty()) { + if (current.getFirstChild() != null) { + // Value will only be set when this element has no child + value = ((Text) current.getFirstChild()).getData().trim(); + } + } + + SchedulerConfNode now = + new SchedulerConfNode(name, value, tag, fields); + + // setParent for all children + for (SchedulerConfNode child : fields.values()) { + child.setParent(now); + } + + return now; + } + + private static DocumentBuilder getDocumentBuilder() + throws ParserConfigurationException { + DocumentBuilderFactory docBuilderFactor = + DocumentBuilderFactory.newInstance(); + docBuilderFactor.setIgnoringComments(true); + DocumentBuilder builder = docBuilderFactor.newDocumentBuilder(); + return builder; + } + + private static Document parseDocument(String content) + throws ParserConfigurationException, SAXException, IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(content.getBytes()); + return parseDocument(bais); + } + + private static Document parseDocument(InputStream is) + throws ParserConfigurationException, SAXException, IOException { + DocumentBuilder builder = getDocumentBuilder(); + Document doc = builder.parse(is); + return doc; + } + + private static Document parseDocument(File file) + throws ParserConfigurationException, SAXException, IOException { + DocumentBuilder builder = getDocumentBuilder(); + Document doc = builder.parse(file); + return doc; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 9cb767d..f10c554 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -36,7 +36,7 @@ import javax.annotation.concurrent.ThreadSafe; @ThreadSafe -public class AllocationConfiguration extends ReservationSchedulerConfiguration { +public class AllocationConfiguration implements ReservationSchedulerConfiguration { private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index d1f0ede..f8ee42c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -273,7 +273,7 @@ public void testResourceTypes() throws Exception { CapacitySchedulerConfiguration csconf = new CapacitySchedulerConfiguration(); - csconf.setResourceComparator(DominantResourceCalculator.class); + csconf.setResourceCalculator(DominantResourceCalculator.class); YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf); testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSHierarchyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSHierarchyConfiguration.java new file mode 100644 index 0000000..18aa56f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSHierarchyConfiguration.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.junit.Assert; +import org.junit.Test; + +public class TestCSHierarchyConfiguration { + @Test + public void testBasicCSHierarchyConfiguration() { + CSHierarchyConfiguration conf = + new CSHierarchyConfiguration("test-capacity-scheduler-hierarchy.xml"); + + // Check global configurations + Assert.assertEquals(9999, conf.getMaximumSystemApplications()); + Assert.assertEquals(0.3f, + conf.getMaximumApplicationMasterResourcePercent(), 1e-6); + + // Check global-policy configurations + Assert.assertEquals(DominantResourceCalculator.class.getName(), conf + .getResourceCalculator().getClass().getName()); + + // Check queue (root) configurations + Assert.assertEquals(100f, conf.getNonLabeledQueueCapacity("root"), 1e-6); + + // Check queue (root.default) configurations + Assert.assertEquals(50f, conf.getNonLabeledQueueCapacity("root.default"), + 1e-6); + + // Check queue (root.default) accessible node labels + Assert.assertTrue(conf.getAccessibleNodeLabels("root.default").contains("x")); + + // Check node label configurations + Assert.assertEquals(20f, conf.getLabeledQueueCapacity("root.default", "x"), + 1e-6); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index fabf47d..16ec7ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -2019,7 +2019,7 @@ public void testKillAllAppsInvalidSource() throws Exception { public void testAppReservationWithDominantResourceCalculator() throws Exception { CapacitySchedulerConfiguration csconf = new CapacitySchedulerConfiguration(); - csconf.setResourceComparator(DominantResourceCalculator.class); + csconf.setResourceCalculator(DominantResourceCalculator.class); YarnConfiguration conf = new YarnConfiguration(csconf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/TestBaseSchedulerHierarchyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/TestBaseSchedulerHierarchyConfiguration.java new file mode 100644 index 0000000..b2f7ce7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/TestBaseSchedulerHierarchyConfiguration.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.config; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Assert; +import org.junit.Test; + +public class TestBaseSchedulerHierarchyConfiguration { + @Test + public void testBaseSchedulerHierarchyConfiguration() throws YarnException { + String content = "" + + "" + + " " + + " " + + " " + + " a-2" + + " " + + " " + + " a1-1" + + " " + + " a1-1" + + " a1-3" + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + + " global-1" + + " global-3" + + + " " + + " global-1" + + " global-2" + + " " + + ""; + + SchedulerConfNode root = SchedulerConfParser.parse(content); + BaseSchedulerHierarchyConfiguration conf = + new BaseSchedulerHierarchyConfiguration(root); + + Assert + .assertEquals("global-1", conf.getGlobalConfig("param1", null, false)); + Assert.assertEquals(null, conf.getGlobalConfig("param2", null, false)); + + // root.a doesn't have param1 specify + Assert.assertEquals(null, + conf.getQueueConfig("root.a", "param1", null, false)); + + // root.a param1 will inherit global-1 + Assert.assertEquals("global-1", + conf.getQueueConfig("root.a", "param1", null, false, true)); + + // root.a.a1 has param1 + Assert.assertEquals("a1-1", + conf.getQueueConfig("root.a.a1", "param1", null, false)); + + // root.a.a1 param2 will inherit root.a + Assert.assertEquals("a-2", + conf.getQueueConfig("root.a.a1", "param2", null, false, true)); + + // root.b will inherit global as well + Assert.assertEquals("global-3", + conf.getQueueConfig("root.b", "param3", null, false, true)); + + // check specifying + Assert.assertEquals("global-1", + conf.getGlobalConfig("policy-1", null, true)); + Assert.assertEquals("a1-1", + conf.getQueueConfig("root.a.a1", "policy-1", null, true)); + Assert.assertEquals("global-2", + conf.getQueueConfig("root.a.a1", "policy-2", null, true, true)); + + try { + conf.getQueueConfig("root.a.a3", "param1", null, false); + Assert.fail("try to fetch an non-existed queue, should fail"); + } catch (YarnRuntimeException e) { + // expected + } + + try { + conf.getQueueConfig("root.c", "param1", null, false); + Assert.fail("try to fetch an non-existed queue, should fail"); + } catch (YarnRuntimeException e) { + // expected + } + + try { + conf.getQueueConfig("non-existed", "param1", null, false, false); + Assert.fail("try to fetch an non-existed queue, should fail"); + } catch (YarnRuntimeException e) { + // expected + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/TestSchedulerConfParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/TestSchedulerConfParser.java new file mode 100644 index 0000000..d7ce709 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/config/TestSchedulerConfParser.java @@ -0,0 +1,117 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.config; + +import java.util.Iterator; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +public class TestSchedulerConfParser { + @Test + public void testSimpleSchedulerConfParser() throws YarnException { + String content = "" + + "" + + " " + + " " + + " " + + " " + + " value3 " + + " value4" + + " " + + " " + + " value5" + + " value6" + + " " + + " " + + " " + + " " + + " " + + " value1" + + " " + + " value2" + + ""; + + SchedulerConfNode root = SchedulerConfParser.parse(content); + SchedulerConfNode expectedRoot = getExpectedSimpleSchedulerConfNode(); + + assertEquals(expectedRoot, root); + } + + public void assertEquals(SchedulerConfNode left, SchedulerConfNode right) { + Assert.assertEquals(left.getTag(), right.getTag()); + Assert.assertEquals(left.getName(), right.getName()); + Assert.assertEquals(left.getValue(), right.getValue()); + Assert.assertEquals(left.getFields().values().size(), right.getFields().values().size()); + + Iterator leftIterator = + left.getFields().values().iterator(); + Iterator rightIterator = + right.getFields().values().iterator(); + while (leftIterator.hasNext()) { + assertEquals(leftIterator.next(), rightIterator.next()); + } + } + + private SchedulerConfNode getExpectedSimpleSchedulerConfNode() { + ListMultimap fields = ArrayListMultimap.create(); + + // leaf1 + fields.put("param3", new SchedulerConfNode("", "value3", "param3", null)); + fields.put("param4", new SchedulerConfNode("", "value4", "param4", null)); + SchedulerConfNode leaf1 = new SchedulerConfNode("leaf1", "", "leaf", fields); + + // leaf2 + fields = ArrayListMultimap.create(); + fields.put("param5", new SchedulerConfNode("", "value5", "param5", null)); + fields.put("param6", new SchedulerConfNode("", "value6", "param6", null)); + SchedulerConfNode leaf2 = new SchedulerConfNode("", "", "leaf", fields); + + // inner children + fields = ArrayListMultimap.create(); + fields.put("leaf", leaf1); + fields.put("leaf", leaf2); + SchedulerConfNode innerChildren = + new SchedulerConfNode("", "", "children", fields); + + // parent + fields = ArrayListMultimap.create(); + fields.put("children", innerChildren); + SchedulerConfNode parent = + new SchedulerConfNode("parent", "", "parent", fields); + + // outer children + fields = ArrayListMultimap.create(); + fields.put("parent", parent); + SchedulerConfNode outerChildren = new SchedulerConfNode("", "", "children", fields); + + // root + fields = ArrayListMultimap.create(); + fields.put("children", outerChildren); + fields.put("param1", new SchedulerConfNode("", "value1", "param1", null)); + fields.put("param2", new SchedulerConfNode("", "value2", "param2", null)); + SchedulerConfNode root = new SchedulerConfNode("", "", "root", fields); + + return root; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/test-capacity-scheduler-hierarchy.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/test-capacity-scheduler-hierarchy.xml new file mode 100644 index 0000000..1b5d9ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/test-capacity-scheduler-hierarchy.xml @@ -0,0 +1,54 @@ + + + + + + capacity + 9999 + + + 0.3 + + + + org.apache.hadoop.yarn.util.resource.DominantResourceCalculator + + + + + + + RUNNING + * + * + x + + + 2 + 50 + 90 + 30 + + + 20 + 50 + + + + + + + + \ No newline at end of file