diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index d4ab8f50021..269942fae66 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -202,6 +202,10 @@ + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java new file mode 100644 index 00000000000..735a93c6314 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Load based policy that generates weighted policies by scaling + * the cluster load (based on pending) to a weight from 0.0 to 1.0. + */ +public class LoadBasedGlobalPolicy extends GlobalPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(LoadBasedGlobalPolicy.class); + + private static final String FEDERATION_GPG_LOAD_BASED_PREFIX = + YarnConfiguration.FEDERATION_GPG_PREFIX + "policy.generator.load-based."; + + public static final String FEDERATION_GPG_LOAD_BASED_MIN_PENDING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.minimum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING = 2000; + + public static final String FEDERATION_GPG_LOAD_BASED_MAX_PENDING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.maximum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING = 3000; + + public static final String FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = + FEDERATION_GPG_LOAD_BASED_PREFIX + "weight.minimum"; + public static final float DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = 0.0f; + + public static final String FEDERATION_GPG_LOAD_BASED_MAX_EDIT = + FEDERATION_GPG_LOAD_BASED_PREFIX + "edit.maximum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT = 3; + + public static final String FEDERATION_GPG_LOAD_BASED_SCALING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "scaling"; + public static final String DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING = + "linear"; + + private int minPending; + private int maxPending; + private float minWeight; + private int maxEdit; + private String scaling; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + minPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MIN_PENDING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING); + maxPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_PENDING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING); + minWeight = conf.getFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT); + maxEdit = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT); + scaling = conf.get(FEDERATION_GPG_LOAD_BASED_SCALING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING); + // Check that all configuration values are valid + if (!(minPending <= maxPending)) { + throw new YarnRuntimeException("minPending=" + minPending + + " must be less than or equal to maxPending=" + maxPending); + } + if (!(minWeight >= 0 && minWeight < 1)) { + throw new YarnRuntimeException( + "minWeight=" + minWeight + " must be within range [0,1)"); + } + } + + @Override + protected Map registerPaths() { + // Register for the endpoints we want to receive information on + Map map = new HashMap<>(); + map.put(ClusterMetricsInfo.class, RMWSConsts.METRICS); + return map; + } + + @Override + protected FederationPolicyManager updatePolicy(String queueName, + Map> clusterInfo, + FederationPolicyManager currentManager) { + Map clusterMetrics = new HashMap<>(); + for (Map.Entry> e : clusterInfo + .entrySet()) { + clusterMetrics.put(e.getKey(), + (ClusterMetricsInfo) e.getValue().get(ClusterMetricsInfo.class)); + } + if (currentManager == null) { + LOG.info("Creating load based weighted policy queue {}", queueName); + Map weights = getTargetWeights(clusterMetrics); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(queueName); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights); + currentManager = manager; + } else if (currentManager instanceof WeightedLocalityPolicyManager) { + Map weights = getTargetWeights(clusterMetrics); + LOG.info("Updating policy for queue {} based on cluster load to: {}", + queueName, weights); + WeightedLocalityPolicyManager manager = + (WeightedLocalityPolicyManager) currentManager; + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights); + } else { + LOG.warn("Policy for queue {} is of type {}, expected {}", queueName, + currentManager.getClass(), WeightedLocalityPolicyManager.class); + } + return currentManager; + } + + @VisibleForTesting + protected Map getTargetWeights( + Map clusterMetrics) { + Map weights = + GPGUtils.createUniformWeights(clusterMetrics.keySet()); + List scs = new ArrayList<>(clusterMetrics.keySet()); + // Sort the sub clusters into descending order based on pending load + scs.sort(new SortByDescendingLoad(clusterMetrics)); + // Keep the top N loaded sub clusters + scs = scs.subList(0, Math.min(maxEdit, scs.size())); + for (SubClusterId sc : scs) { + LOG.info("Updating weight for sub cluster {}", sc.toString()); + int pending = clusterMetrics.get(sc).getAppsPending(); + if (pending <= minPending) { + LOG.info("Load ({}) is lower than minimum ({}), skipping", pending, + minPending); + } else if (pending < maxPending) { + float weight = 1.0f; + // The different scaling strategies should all map values from the + // range min_pending+1 to max_pending to the range min_weight to 1.0f + // so we pre process and simplify the domain to some value [1, MAX-MIN) + int val = pending - minPending; + int maxVal = maxPending - minPending; + switch (scaling) { + default: + LOG.warn("Unknown scaling type [{}], defaulting to 1.0f", scaling); + break; + case "linear": + weight = (float) (maxVal - val) / (float) (maxVal); + break; + case "quadratic": + double maxValQuad = Math.pow(maxVal, 2); + double valQuad = Math.pow(val, 2); + weight = (float) (maxValQuad - valQuad) / (float) (maxValQuad); + break; + case "log": + double maxValLog = Math.log(maxVal); + double valLog = Math.log(val); + weight = (float) (maxValLog - valLog) / (float) (maxValLog); + break; + } + // Scale the weights to respect the config minimum + weight = weight * (1.0f - minWeight); + weight += minWeight; + weights.put(new SubClusterIdInfo(sc), weight); + LOG.info("Load ({}) is within maximum ({}), setting weights via {} " + + "scale to {}", pending, maxPending, scaling, weight); + } else { + weights.put(new SubClusterIdInfo(sc), minWeight); + LOG.info( + "Load ({}) exceeded maximum ({}), setting weight to minimum: {}", + pending, maxPending, minWeight); + } + } + return weights; + } + + private static final class SortByDescendingLoad + implements Comparator { + + private Map clusterMetrics; + + private SortByDescendingLoad( + Map clusterMetrics) { + this.clusterMetrics = clusterMetrics; + } + + public int compare(SubClusterId a, SubClusterId b) { + // Sort by pending load + return clusterMetrics.get(b).getAppsPending() - clusterMetrics.get(a) + .getAppsPending(); + } + } + +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java new file mode 100644 index 00000000000..38cf92e209c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the Load Based Global Policy. + */ +public class TestLoadBasedGlobalPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(TestLoadBasedGlobalPolicy.class); + + private static final int NUM_SC = 3; + private static final float DELTA = 0.00001f; + + private static final int MIN_PENDING = 100; + private static final int MAX_PENDING = 500; + + private List subClusterIds; + private Map clusterMetricsInfos; + private Map weights; + + private Configuration conf; + private LoadBasedGlobalPolicy policyGenerator; + + public TestLoadBasedGlobalPolicy() { + conf = new Configuration(); + policyGenerator = new LoadBasedGlobalPolicy(); + } + + @Before + public void setUp() throws IOException, YarnException { + + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 2); + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_PENDING, + MIN_PENDING); + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_PENDING, + MAX_PENDING); + conf.setFloat(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + 0.0f); + conf.set(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_SCALING, "linear"); + policyGenerator.setConf(conf); + + subClusterIds = new ArrayList(); + clusterMetricsInfos = new HashMap(); + // Set up sub clusters + for (int i = 0; i < NUM_SC; ++i) { + // Sub cluster Id + SubClusterId id = SubClusterId.newInstance("sc" + i); + subClusterIds.add(id); + + // Cluster metrics info + ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo(); + metricsInfo.setAppsPending(50); + clusterMetricsInfos.put(id, metricsInfo); + } + } + + @Test + public void testSimpleTargetWeights() { + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(1.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testLoadTargetWeights() { + getMetric(0).setAppsPending(100); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(1.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + getMetric(0).setAppsPending(500); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testMaxEdit() { + // The policy should be able to edit 2 weights + getMetric(0).setAppsPending(MAX_PENDING + 200); + getMetric(1).setAppsPending(MAX_PENDING + 100); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(0.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + // After updating the config, it should only edit the most loaded + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 1); + policyGenerator.setConf(conf); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testMinWeight() { + // If a minimum weight is set, the generator should not go below it + conf.setFloat(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + 0.5f); + policyGenerator.setConf(conf); + getMetric(0).setAppsPending(Integer.MAX_VALUE); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.5, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testScaling() { + LOG.info("Testing that the generator weights are monotonically" + + " decreasing regardless of scaling method"); + for (String scaling : new String[] {"linear", "quadratic", "log"}) { + LOG.info("Testing {} scaling...", scaling); + conf.set(LoadBasedGlobalPolicy.DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING, + scaling); + policyGenerator.setConf(conf); + // Test a continuous range for scaling + float prevWeight = 1.01f; + for (int load = 0; load < MAX_PENDING * 2; ++load) { + getMetric(0).setAppsPending(load); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + if (load < MIN_PENDING) { + // Below the minimum load, it should stay 1.0f + assertEquals(1.0f, getWeight(0), DELTA); + } else if (load < MAX_PENDING) { + // In the specified range, the weight should consistently decrease + float weight = getWeight(0); + assertTrue(weight < prevWeight); + prevWeight = weight; + } else { + // Above the maximum load, it should stay 0.0f + assertEquals(0.0f, getWeight(0), DELTA); + } + } + } + } + + private float getWeight(int sc) { + return weights.get(new SubClusterIdInfo(subClusterIds.get(sc))); + } + + private ClusterMetricsInfo getMetric(int sc) { + return clusterMetricsInfos.get(subClusterIds.get(sc)); + } +} +