diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index d4ab8f50021..6303f165ea4 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -202,6 +202,10 @@ + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index eabe413872a..90270a39276 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3160,7 +3160,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; - private static final String FEDERATION_GPG_PREFIX = + public static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg."; // The number of threads to use for the GPG scheduled executor service @@ -3178,6 +3178,37 @@ public static boolean isAclEnabled(Configuration conf) { FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + /** The interval at which the policy generator runs, default is one hour. */ + public static final String GPG_POLICY_GENERATOR_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "policy.generator.interval-ms"; + public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS = -1; + + /** + * The configured policy generator class, runs DefaultGlobalPolicy by + * default. + */ + public static final String GPG_GLOBAL_POLICY_CLASS = + FEDERATION_GPG_PREFIX + "policy.generator.class"; + public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS = + "org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator." + + "DefaultGlobalPolicy"; + + /** + * Whether or not the policy generator is running in read only (won't modify + * policies), default is false. + */ + public static final String GPG_POLICY_GENERATOR_READONLY = + FEDERATION_GPG_PREFIX + "policy.generator.readonly"; + public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY = + false; + + /** + * Which sub-clusters the policy generator should blacklist. + */ + public static final String GPG_POLICY_GENERATOR_BLACKLIST = + FEDERATION_GPG_PREFIX + "policy.generator.blacklist"; + + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 899c210b390..3168d2199fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3425,6 +3425,46 @@ 1800000 + + + The interval at which the policy generator runs, default is one hour + + yarn.federation.gpg.policy.generator.interval-ms + 3600000 + + + + + The configured policy generator class, runs DefaultGlobalPolicy by default + + yarn.federation.gpg.policy.generator.class + org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.DefaultGlobalPolicy + + + + + Whether or not the policy generator is running in read only (won't modify policies), default is false + + yarn.federation.gpg.policy.generator.readonly + false + + + + + Whether or not the policy generator is running in read only (won't modify policies), default is false + + yarn.federation.gpg.policy.generator.readonly + false + + + + + Which subclusters the gpg should blacklist, default is none + + yarn.federation.gpg.policy.generator.blacklist + + + It is TimelineClient 1.5 configuration whether to store active diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index ef7711412de..e8ab8b8c598 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -372,6 +373,18 @@ public SubClusterPolicyConfiguration getPolicyConfiguration( } } + /** + * Set a policy configuration into the state store. + * + * @param policyConf the policy configuration to set + * @throws YarnException if the request is invalid/fails + */ + public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf) + throws YarnException { + stateStore.setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest.newInstance(policyConf)); + } + /** * Adds the home {@link SubClusterId} for the specified {@link ApplicationId}. * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml index 9bbb936e7b9..9398b0b3a28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml @@ -61,6 +61,12 @@ test + + org.apache.hadoop + hadoop-yarn-server-timelineservice + provided + + org.apache.hadoop hadoop-yarn-server-resourcemanager @@ -72,6 +78,12 @@ test + + org.mockito + mockito-all + test + + org.apache.hadoop hadoop-yarn-server-common @@ -92,6 +104,12 @@ org.apache.rat apache-rat-plugin + + + src/test/resources/schedulerInfo1.json + src/test/resources/schedulerInfo2.json + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java index da8a383bd75..6b0a5a43112 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java @@ -28,4 +28,8 @@ FederationStateStoreFacade getStateStoreFacade(); void setStateStoreFacade(FederationStateStoreFacade facade); + + GPGPolicyFacade getPolicyFacade(); + + void setPolicyFacade(GPGPolicyFacade facade); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java index 3884ace9cef..bb498448fae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java @@ -26,6 +26,7 @@ public class GPGContextImpl implements GPGContext { private FederationStateStoreFacade facade; + private GPGPolicyFacade policyFacade; @Override public FederationStateStoreFacade getStateStoreFacade() { @@ -38,4 +39,13 @@ public void setStateStoreFacade( this.facade = federationStateStoreFacade; } + @Override + public GPGPolicyFacade getPolicyFacade(){ + return policyFacade; + } + + @Override + public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){ + policyFacade = gpgPolicyfacade; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java new file mode 100644 index 00000000000..90391cd61cc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * A utility class for the GPG Policy Generator to read and write policies + * into the state store. Policy specific logic is abstracted away in this + * class, so the PolicyGenerator can avoid dealing with policy construction, + * reinitialization, and serialization. + * + * There are only two exposed methods: + * + * {@link #getPolicyManager(String)} + * Gets the PolicyManager via queue name. Null if there is no policy + * configured for the specified queue. The PolicyManager can be used to + * extract the {@link FederationRouterPolicy} and + * {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters + * + * {@link #setPolicyManager(FederationPolicyManager)} + * Sets the PolicyManager. If the policy configuration is the same, no change + * occurs. Otherwise, the internal cache is updated and the new configuration + * is written into the state store + * + * This class assumes that the GPG is the only service + * writing policies. Thus, the only state store reads occur the first time a + * queue policy is retrieved - after that, the GPG only writes to the state + * store. + * + * The class uses a PolicyManager cache and a SubClusterPolicyConfiguration + * cache. The primary use for these caches are to serve reads, and to + * identify when the PolicyGenerator has actually changed the policy + * so unnecessary state store policy writes can be avoided. + */ + +public class GPGPolicyFacade { + + private static final Logger LOG = + LoggerFactory.getLogger(GPGPolicyFacade.class); + + private FederationStateStoreFacade stateStore; + + private Map policyManagerMap; + private Map policyConfMap; + + private boolean readOnly; + + public GPGPolicyFacade(FederationStateStoreFacade stateStore, + Configuration conf) { + this.stateStore = stateStore; + this.policyManagerMap = new HashMap<>(); + this.policyConfMap = new HashMap<>(); + this.readOnly = + conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, + YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY); + } + + /** + * Provides a utility for the policy generator to read the policy manager + * from the state store. Because the policy generator should be the only + * component updating the policy, this implementation does not use the + * reinitialization feature. + * + * @param queueName the name of the queue we want the policy manager for. + * @return the policy manager responsible for the queue policy. + */ + public FederationPolicyManager getPolicyManager(String queueName) + throws YarnException { + FederationPolicyManager policyManager = policyManagerMap.get(queueName); + // If we don't have the policy manager cached, pull configuration + // from the state store to create and cache it + if (policyManager == null) { + try { + // If we don't have the configuration cached, pull it + // from the stateStore + SubClusterPolicyConfiguration conf = policyConfMap.get(queueName); + if (conf == null) { + conf = stateStore.getPolicyConfiguration(queueName); + } + // If configuration is still null, it does not exist in the state store + if (conf == null) { + LOG.info("Read null policy for queue {}", queueName); + return null; + } + policyManager = + FederationPolicyUtils.instantiatePolicyManager(conf.getType()); + policyManager.setQueue(queueName); + + // TODO there is currently no way to cleanly deserialize a policy + // manager sub type from just the configuration + if (policyManager instanceof WeightedLocalityPolicyManager) { + WeightedPolicyInfo wpinfo = + WeightedPolicyInfo.fromByteBuffer(conf.getParams()); + WeightedLocalityPolicyManager wlpmanager = + (WeightedLocalityPolicyManager) policyManager; + LOG.info("Updating policy for queue {} to configured weights router: " + + "{}, amrmproxy: {}", queueName, + wpinfo.getRouterPolicyWeights(), + wpinfo.getAMRMPolicyWeights()); + wlpmanager.setWeightedPolicyInfo(wpinfo); + } else { + LOG.warn("Warning: FederationPolicyManager of unsupported type {}, " + + "initialization may be incomplete ", policyManager.getClass()); + } + + policyManagerMap.put(queueName, policyManager); + policyConfMap.put(queueName, conf); + } catch (YarnException e) { + LOG.error("Error reading SubClusterPolicyConfiguration from state " + + "store for queue: {}", queueName); + throw e; + } + } + return policyManager; + } + + /** + * Provides a utility for the policy generator to write a policy manager + * into the state store. The facade keeps a cache and will only write into + * the state store if the policy configuration has changed. + * + * @param policyManager The policy manager we want to update into the state + * store. It contains policy information as well as + * the queue name we will update for. + */ + public void setPolicyManager(FederationPolicyManager policyManager) + throws YarnException { + if (policyManager == null) { + LOG.warn("Attempting to set null policy manager"); + return; + } + // Extract the configuration from the policy manager + String queue = policyManager.getQueue(); + SubClusterPolicyConfiguration conf; + try { + conf = policyManager.serializeConf(); + } catch (FederationPolicyInitializationException e) { + LOG.warn("Error serializing policy for queue {}", queue); + throw e; + } + if (conf == null) { + // State store does not currently support setting a policy back to null + // because it reads the queue name to set from the policy! + LOG.warn("Skip setting policy to null for queue {} into state store", + queue); + return; + } + // Compare with configuration cache, if different, write the conf into + // store and update our conf and manager cache + if (!confCacheEqual(queue, conf)) { + try { + if (readOnly) { + LOG.info("[read-only] Skipping policy update for queue {}", queue); + return; + } + LOG.info("Updating policy for queue {} into state store", queue); + stateStore.setPolicyConfiguration(conf); + policyConfMap.put(queue, conf); + policyManagerMap.put(queue, policyManager); + } catch (YarnException e) { + LOG.warn("Error writing SubClusterPolicyConfiguration to state " + + "store for queue: {}", queue); + throw e; + } + } else { + LOG.info("Setting unchanged policy - state store write skipped"); + } + } + + /** + * @param queue + * @param conf + * @return whether or not the conf is equal to the cached conf + */ + private boolean confCacheEqual(String queue, + SubClusterPolicyConfiguration conf) { + SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue); + if (conf == null && cachedConf == null) { + return true; + } else if (conf != null && cachedConf != null) { + if (conf.equals(cachedConf)) { + return true; + } + } + return false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java new file mode 100644 index 00000000000..70e3588ac3c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; + +/** + * GPGUtils contains utility functions for the GPG. + * + */ +public final class GPGUtils { + + // hide constructor + private GPGUtils() { + } + + /** + * Performs an invocation of the the remote RMWebService. + */ + public static T invokeRMWebService(Configuration conf, String webAddr, + String path, final Class returnType) { + Client client = Client.create(); + T obj = null; + + WebResource webResource = client.resource(webAddr); + ClientResponse response = webResource.path("ws/v1/cluster").path(path) + .accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + if (response.getStatus() == HttpServletResponse.SC_OK) { + obj = response.getEntity(returnType); + } else { + throw new YarnRuntimeException("Bad response from remote web service: " + + response.getStatus()); + } + return obj; + } + + /** + * Creates a default weighting. + */ + public static Map createUniformWeights( + Set ids) { + Map weights = + new HashMap<>(); + for(SubClusterId id : ids) { + weights.put(new SubClusterIdInfo(id), 1.0f); + } + return weights; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index f6cfba027db..88b9f2bd30b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; private SubClusterCleaner subClusterCleaner; + private PolicyGenerator policyGenerator; public GlobalPolicyGenerator() { super(GlobalPolicyGenerator.class.getName()); @@ -73,11 +75,15 @@ protected void serviceInit(Configuration conf) throws Exception { // Set up the context this.gpgContext .setStateStoreFacade(FederationStateStoreFacade.getInstance()); + this.gpgContext + .setPolicyFacade(new GPGPolicyFacade( + this.gpgContext.getStateStoreFacade(), conf)); this.scheduledExecutorService = new ScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + this.policyGenerator = new PolicyGenerator(conf, this.gpgContext); DefaultMetricsSystem.initialize(METRICS_NAME); @@ -99,6 +105,17 @@ protected void serviceStart() throws Exception { LOG.info("Scheduled sub-cluster cleaner with interval: {}", DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); } + + // Schedule PolicyGenerator + long policyGeneratorIntervalMillis = getConfig().getLong( + YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS); + if(policyGeneratorIntervalMillis > 0){ + this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator, + 0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS); + LOG.info("Scheduled policygenerator with interval: {}", + DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis)); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/DefaultGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/DefaultGlobalPolicy.java new file mode 100644 index 00000000000..6a3446e8089 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/DefaultGlobalPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Default policy that does nothing. + */ +public class DefaultGlobalPolicy extends GlobalPolicy{ + + private static final Logger LOG = + LoggerFactory.getLogger(DefaultGlobalPolicy.class); + + @Override + public FederationPolicyManager updatePolicy(String queueName, + Map> clusterInfo, + FederationPolicyManager manager) { + LOG.info("Updating policy for queue {}", queueName); + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java new file mode 100644 index 00000000000..e698e872f60 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +import java.util.Map; + +/** + * This interface defines the plug-able policy that the PolicyGenerator uses + * to update policies into the state store. + */ + +public abstract class GlobalPolicy implements Configurable { + + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Given a map of class type to string, write into the map the object type + * and RM path to request it from - the framework will query these paths + * and provide the objects to the policy. Delegating this responsibility to + * the PolicyGenerator enables us avoid duplicate calls to the same + * endpoints as the GlobalPolicy is invoked once per queue. + */ + protected void registerPaths(Map paths) { + // Default register nothing + } + + /** + * Given a queue, cluster metrics, and policy manager, update the policy + * to account for the cluster status. This method defines the policy generator + * behavior. + * + * @param queueName name of the queue + * @param clusterInfo subClusterId map to cluster information about the + * sub cluster used to make policy decisions + * @param manager the FederationPolicyManager for the queue's existing + * policy the manager may be null, in which case the policy + * will need to be created + * @return policy manager that handles the updated (or created) policy + */ + protected abstract FederationPolicyManager updatePolicy(String queueName, + Map> clusterInfo, + FederationPolicyManager manager); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java new file mode 100644 index 00000000000..c25c26df620 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Load based policy that generates weighted policies by scaling + * the cluster load (based on pending) to a weight from 0.0 to 1.0. + */ +public class LoadBasedGlobalPolicy extends GlobalPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(LoadBasedGlobalPolicy.class); + + private static final String FEDERATION_GPG_LOAD_BASED_PREFIX = + YarnConfiguration.FEDERATION_GPG_PREFIX + "policy.generator.load-based."; + + public static final String FEDERATION_GPG_LOAD_BASED_MIN_PENDING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.minimum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING = 2000; + + public static final String FEDERATION_GPG_LOAD_BASED_MAX_PENDING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.maximum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING = 3000; + + public static final String FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = + FEDERATION_GPG_LOAD_BASED_PREFIX + "weight.minimum"; + public static final float DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = 0.0f; + + public static final String FEDERATION_GPG_LOAD_BASED_MAX_EDIT = + FEDERATION_GPG_LOAD_BASED_PREFIX + "edit.maximum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT = 3; + + public static final String FEDERATION_GPG_LOAD_BASED_SCALING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "scaling"; + public static final String DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING = + "linear"; + + private int minPending; + private int maxPending; + private float minWeight; + private int maxEdit; + private String scaling; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + minPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MIN_PENDING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING); + maxPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_PENDING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING); + minWeight = conf.getFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT); + maxEdit = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT); + scaling = conf.get(FEDERATION_GPG_LOAD_BASED_SCALING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING); + // Check that all configuration values are valid + if (!(minPending <= maxPending)) { + throw new YarnRuntimeException("minPending=" + minPending + + " must be less than or equal to maxPending=" + maxPending); + } + if (!(minWeight >= 0 && minWeight < 1)) { + throw new YarnRuntimeException( + "minWeight=" + minWeight + " must be within range [0,1)"); + } + } + + @Override + protected void registerPaths(Map map) { + // Register for the endpoints we want to receive information on + map.put(ClusterMetricsInfo.class, RMWSConsts.METRICS); + } + + @Override + protected FederationPolicyManager updatePolicy(String queueName, + Map> clusterInfo, + FederationPolicyManager currentManager) { + Map clusterMetrics = new HashMap<>(); + for (Map.Entry> e : clusterInfo + .entrySet()) { + clusterMetrics.put(e.getKey(), + (ClusterMetricsInfo) e.getValue().get(ClusterMetricsInfo.class)); + } + if (currentManager == null) { + LOG.info("Creating load based weighted policy queue {}", queueName); + Map weights = getTargetWeights(clusterMetrics); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(queueName); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights); + currentManager = manager; + } else if (currentManager instanceof WeightedLocalityPolicyManager) { + Map weights = getTargetWeights(clusterMetrics); + LOG.info("Updating policy for queue {} based on cluster load to: {}", + queueName, weights); + WeightedLocalityPolicyManager manager = + (WeightedLocalityPolicyManager) currentManager; + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights); + } else { + LOG.warn("Policy for queue {} is of type {}, expected {}", queueName, + currentManager.getClass(), WeightedLocalityPolicyManager.class); + } + return currentManager; + } + + @VisibleForTesting + protected Map getTargetWeights( + Map clusterMetrics) { + Map weights = + GPGUtils.createUniformWeights(clusterMetrics.keySet()); + List scs = new ArrayList<>(clusterMetrics.keySet()); + // Sort the sub clusters into descending order based on pending load + scs.sort(new SortByDescendingLoad(clusterMetrics)); + // Keep the top N loaded sub clusters + scs = scs.subList(0, Math.min(maxEdit, scs.size())); + for (SubClusterId sc : scs) { + LOG.info("Updating weight for sub cluster {}", sc.toString()); + int pending = clusterMetrics.get(sc).getAppsPending(); + if (pending <= minPending) { + LOG.info("Load ({}) is lower than minimum ({}), skipping", pending, + minPending); + } else if (pending < maxPending) { + float weight = 1.0f; + // The different scaling strategies should all map values from the + // range min_pending+1 to max_pending to the range min_weight to 1.0f + // so we pre process and simplify the domain to some value [1, MAX-MIN) + int val = pending - minPending; + int maxVal = maxPending - minPending; + switch (scaling) { + default: + LOG.warn("Unknown scaling type [{}], defaulting to 1.0f", scaling); + break; + case "linear": + weight = (float) (maxVal - val) / (float) (maxVal); + break; + case "quadratic": + double maxValQuad = Math.pow(maxVal, 2); + double valQuad = Math.pow(val, 2); + weight = (float) (maxValQuad - valQuad) / (float) (maxValQuad); + break; + case "log": + double maxValLog = Math.log(maxVal); + double valLog = Math.log(val); + weight = (float) (maxValLog - valLog) / (float) (maxValLog); + break; + } + // Scale the weights to respect the config minimum + weight = weight * (1.0f - minWeight); + weight += minWeight; + weights.put(new SubClusterIdInfo(sc), weight); + LOG.info("Load ({}) is within maximum ({}), setting weights via {} " + + "scale to {}", pending, maxPending, scaling, weight); + } else { + weights.put(new SubClusterIdInfo(sc), minWeight); + LOG.info( + "Load ({}) exceeded maximum ({}), setting weight to minimum: {}", + pending, maxPending, minWeight); + } + } + return weights; + } + + private static final class SortByDescendingLoad + implements Comparator { + + private Map clusterMetrics; + + private SortByDescendingLoad( + Map clusterMetrics) { + this.clusterMetrics = clusterMetrics; + } + + public int compare(SubClusterId a, SubClusterId b) { + // Sort by pending load + return clusterMetrics.get(b).getAppsPending() - clusterMetrics.get(a) + .getAppsPending(); + } + } + +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java new file mode 100644 index 00000000000..494e7fb5241 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * The PolicyGenerator runs periodically and updates the policy configuration + * for each queue into the state store. This is a plug-able component - + * subclasses should override createPolicy and updatePolicy to define + * specific behavior. + */ + +public class PolicyGenerator implements Runnable, Configurable { + + private static final Logger LOG = + LoggerFactory.getLogger(PolicyGenerator.class); + + private GPGContext gpgContext; + private Configuration conf; + + // Information request map + private Map pathMap = new HashMap<>(); + + // Global policy instance + @VisibleForTesting + protected GlobalPolicy policy; + + /** + * The PolicyGenerator periodically reads sub cluster load and updates + * policies into the state store. + */ + public PolicyGenerator(Configuration conf, GPGContext context) { + setConf(conf); + init(context); + } + + private void init(GPGContext context) { + this.gpgContext = context; + LOG.info("Initialized PolicyGenerator"); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.policy = FederationStateStoreFacade + .createInstance(conf, YarnConfiguration.GPG_GLOBAL_POLICY_CLASS, + YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS, + GlobalPolicy.class); + policy.setConf(conf); + policy.registerPaths(pathMap); + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public final void run() { + Map activeSubClusters; + try { + activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true); + } catch (YarnException e) { + LOG.error("Error retrieving active sub-clusters", e); + return; + } + + // Parse the scheduler information from all the SCs + Map schedInfo = + getSchedulerInfo(activeSubClusters); + + // Extract and enforce that all the schedulers have matching type + Set queueNames = extractQueues(schedInfo); + + // Remove black listed sub clusters + activeSubClusters.keySet().removeAll(getBlackList()); + LOG.info("Active non-blacklist sub-clusters: {}", + activeSubClusters.keySet()); + + // Get cluster metrics information from non black listed RMs - later used + // to evaluate sub cluster load + Map> clusterInfo = + getInfos(activeSubClusters); + + // Update into the federation state store + for (String queueName : queueNames) { + // Retrieve the manager from the policy facade + FederationPolicyManager manager; + try { + manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName); + } catch (YarnException e) { + LOG.error("GetPolicy for queue {} failed", queueName, e); + continue; + } + LOG.info("Updating policy for queue {}", queueName); + manager = policy.updatePolicy(queueName, clusterInfo, manager); + try { + this.gpgContext.getPolicyFacade().setPolicyManager(manager); + } catch (YarnException e) { + LOG.error("SetPolicy for queue {} failed", queueName, e); + } + } + } + + /** + * Helper to retrieve metrics from the RM REST endpoints. + * + * @param activeSubClusters A map of active sub cluster IDs to info + */ + @VisibleForTesting + protected Map> getInfos( + Map activeSubClusters) { + + Map> clusterInfo = new HashMap<>(); + for (SubClusterInfo sci : activeSubClusters.values()) { + for (Map.Entry e : this.pathMap.entrySet()) { + if (!clusterInfo.containsKey(sci.getSubClusterId())) { + clusterInfo.put(sci.getSubClusterId(), new HashMap()); + } + Object ret = GPGUtils + .invokeRMWebService(conf, sci.getRMWebServiceAddress(), + e.getValue(), e.getKey()); + clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret); + } + } + + return clusterInfo; + } + + /** + * Helper to retrieve SchedulerInfos. + * + * @param activeSubClusters A map of active sub cluster IDs to info + */ + @VisibleForTesting + protected Map getSchedulerInfo( + Map activeSubClusters) { + Map schedInfo = + new HashMap<>(); + for (SubClusterInfo sci : activeSubClusters.values()) { + // TODO parallel + SchedulerTypeInfo sti = GPGUtils + .invokeRMWebService(conf, sci.getRMWebServiceAddress(), + RMWSConsts.SCHEDULER, SchedulerTypeInfo.class); + schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo()); + } + return schedInfo; + } + + /** + * Helper to get a set of blacklisted sub cluster Ids from configuration. + */ + private Set getBlackList() { + String blackListParam = + conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST, ""); + Set blackList = new HashSet<>(); + for (String id : blackListParam.split(",")) { + blackList.add(SubClusterId.newInstance(id)); + } + return blackList; + } + + /** + * Given the scheduler information for all RMs, extract the union of + * queue names - right now we only consider instances of capacity scheduler. + * + * @param schedInfo the scheduler information + * @return + */ + private Set extractQueues( + Map schedInfo) { + Set queueNames = new HashSet(); + for (Map.Entry entry : schedInfo.entrySet()) { + // TODO workaround - eventually the RMs should write + // queue structure into the state store. + if (entry.getValue() instanceof CapacitySchedulerInfo) { + // Flatten the queue structure and get only non leaf queues + queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue()) + .get(CapacitySchedulerQueueInfo.class)); + } else { + LOG.warn("Skipping sub cluster {}, not configured with capacity " + + "scheduler", entry.getKey()); + } + } + return queueNames; + } + + // Helpers to flatten the queue structure into a multimap of + // queue type to set of queue names + private Map> flattenQueue(CapacitySchedulerInfo csi) { + Map> flattened = new HashMap>(); + addOrAppend(flattened, csi.getClass(), csi.getQueueName()); + for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) { + flattenQueue(csqi, flattened); + } + return flattened; + } + + private void flattenQueue(CapacitySchedulerQueueInfo csi, + Map> flattened) { + addOrAppend(flattened, csi.getClass(), csi.getQueueName()); + if (csi.getQueues() != null) { + for (CapacitySchedulerQueueInfo csqi : csi.getQueues() + .getQueueInfoList()) { + flattenQueue(csqi, flattened); + } + } + } + + private void addOrAppend(Map> multimap, K key, V value) { + if (!multimap.containsKey(key)) { + multimap.put(key, new HashSet()); + } + multimap.get(key).add(value); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java new file mode 100644 index 00000000000..a8059f8f975 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.commons.math3.optim.nonlinear.vector.Weight; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Simple policy that generates and updates uniform weighted locality + * policies. + */ +public class UniformWeightedLocalityGlobalPolicy extends GlobalPolicy{ + + private static final Logger LOG = + LoggerFactory.getLogger(UniformWeightedLocalityGlobalPolicy.class); + + @Override + protected FederationPolicyManager updatePolicy(String queueName, + Map> clusterInfo, + FederationPolicyManager currentManager){ + if(currentManager == null){ + // Set uniform weights for all sub clusters + LOG.info("Creating uniform weighted policy queue {}", queueName); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(queueName); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(clusterInfo.keySet())); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(clusterInfo.keySet())); + currentManager = manager; + } + if(currentManager instanceof WeightedLocalityPolicyManager){ + LOG.info("Updating policy for queue {} to default weights", queueName); + WeightedLocalityPolicyManager wlpmanager = + (WeightedLocalityPolicyManager) currentManager; + wlpmanager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(clusterInfo.keySet())); + wlpmanager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(clusterInfo.keySet())); + } else { + LOG.info("Policy for queue {} is of type {}, expected {}", + queueName, currentManager.getClass(), Weight.class); + } + return currentManager; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java new file mode 100644 index 00000000000..ef93c9ad2a0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java new file mode 100644 index 00000000000..45ee266ee36 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Unit test for GPG Policy Facade. + */ +public class TestGPGPolicyFacade { + + private Configuration conf; + private FederationStateStore stateStore; + private FederationStateStoreFacade facade = + FederationStateStoreFacade.getInstance(); + private GPGPolicyFacade policyFacade; + + private Set subClusterIds; + + private SubClusterPolicyConfiguration testConf; + + private static final String TEST_QUEUE = "test-queue"; + + public TestGPGPolicyFacade() { + conf = new Configuration(); + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + subClusterIds = new HashSet<>(); + subClusterIds.add(SubClusterId.newInstance("sc0")); + subClusterIds.add(SubClusterId.newInstance("sc1")); + subClusterIds.add(SubClusterId.newInstance("sc2")); + } + + @Before + public void setUp() throws IOException, YarnException { + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + facade.reinitialize(stateStore, conf); + policyFacade = new GPGPolicyFacade(facade, conf); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + // Add a test policy for test queue + manager.setQueue(TEST_QUEUE); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + testConf = manager.serializeConf(); + stateStore.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest + .newInstance(testConf)); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + @Test + public void testGetPolicy() throws YarnException { + WeightedLocalityPolicyManager manager = + (WeightedLocalityPolicyManager) policyFacade + .getPolicyManager(TEST_QUEUE); + Assert.assertEquals(testConf, manager.serializeConf()); + } + + @Test + public void testSetNewPolicy() throws YarnException { + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(TEST_QUEUE + 0); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + SubClusterPolicyConfiguration policyConf = manager.serializeConf(); + policyFacade.setPolicyManager(manager); + + manager = + (WeightedLocalityPolicyManager) policyFacade + .getPolicyManager(TEST_QUEUE + 0); + Assert.assertEquals(policyConf, manager.serializeConf()); + } + + @Test + public void testOverwritePolicy() throws YarnException { + subClusterIds.add(SubClusterId.newInstance("sc3")); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(TEST_QUEUE); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + SubClusterPolicyConfiguration policyConf = manager.serializeConf(); + policyFacade.setPolicyManager(manager); + + manager = + (WeightedLocalityPolicyManager) policyFacade + .getPolicyManager(TEST_QUEUE); + Assert.assertEquals(policyConf, manager.serializeConf()); + } + + @Test + public void testWriteCache() throws YarnException { + stateStore = mock(MemoryFederationStateStore.class); + facade.reinitialize(stateStore, conf); + when(stateStore.getPolicyConfiguration(Matchers.any( + GetSubClusterPolicyConfigurationRequest.class))).thenReturn( + GetSubClusterPolicyConfigurationResponse.newInstance(testConf)); + policyFacade = new GPGPolicyFacade(facade, conf); + + // Query once to fill the cache + FederationPolicyManager manager = policyFacade.getPolicyManager(TEST_QUEUE); + // State store should be contacted once + verify(stateStore, times(1)).getPolicyConfiguration( + Matchers.any(GetSubClusterPolicyConfigurationRequest.class)); + + // If we set the same policy, the state store should be untouched + policyFacade.setPolicyManager(manager); + verify(stateStore, times(0)).setPolicyConfiguration( + Matchers.any(SetSubClusterPolicyConfigurationRequest.class)); + } + + @Test + public void testReadOnly() throws YarnException { + conf.setBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, true); + stateStore = mock(MemoryFederationStateStore.class); + facade.reinitialize(stateStore, conf); + when(stateStore.getPolicyConfiguration(Matchers.any( + GetSubClusterPolicyConfigurationRequest.class))).thenReturn( + GetSubClusterPolicyConfigurationResponse.newInstance(testConf)); + policyFacade = new GPGPolicyFacade(facade, conf); + + // If we set a policy, the state store should be untouched + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + // Add a test policy for test queue + manager.setQueue(TEST_QUEUE); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + manager.getWeightedPolicyInfo().setRouterPolicyWeights( + GPGUtils.createUniformWeights(subClusterIds)); + policyFacade.setPolicyManager(manager); + verify(stateStore, times(0)).setPolicyConfiguration( + Matchers.any(SetSubClusterPolicyConfigurationRequest.class)); + } + +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java new file mode 100644 index 00000000000..38cf92e209c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the Load Based Global Policy. + */ +public class TestLoadBasedGlobalPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(TestLoadBasedGlobalPolicy.class); + + private static final int NUM_SC = 3; + private static final float DELTA = 0.00001f; + + private static final int MIN_PENDING = 100; + private static final int MAX_PENDING = 500; + + private List subClusterIds; + private Map clusterMetricsInfos; + private Map weights; + + private Configuration conf; + private LoadBasedGlobalPolicy policyGenerator; + + public TestLoadBasedGlobalPolicy() { + conf = new Configuration(); + policyGenerator = new LoadBasedGlobalPolicy(); + } + + @Before + public void setUp() throws IOException, YarnException { + + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 2); + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_PENDING, + MIN_PENDING); + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_PENDING, + MAX_PENDING); + conf.setFloat(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + 0.0f); + conf.set(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_SCALING, "linear"); + policyGenerator.setConf(conf); + + subClusterIds = new ArrayList(); + clusterMetricsInfos = new HashMap(); + // Set up sub clusters + for (int i = 0; i < NUM_SC; ++i) { + // Sub cluster Id + SubClusterId id = SubClusterId.newInstance("sc" + i); + subClusterIds.add(id); + + // Cluster metrics info + ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo(); + metricsInfo.setAppsPending(50); + clusterMetricsInfos.put(id, metricsInfo); + } + } + + @Test + public void testSimpleTargetWeights() { + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(1.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testLoadTargetWeights() { + getMetric(0).setAppsPending(100); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(1.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + getMetric(0).setAppsPending(500); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testMaxEdit() { + // The policy should be able to edit 2 weights + getMetric(0).setAppsPending(MAX_PENDING + 200); + getMetric(1).setAppsPending(MAX_PENDING + 100); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(0.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + // After updating the config, it should only edit the most loaded + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 1); + policyGenerator.setConf(conf); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testMinWeight() { + // If a minimum weight is set, the generator should not go below it + conf.setFloat(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + 0.5f); + policyGenerator.setConf(conf); + getMetric(0).setAppsPending(Integer.MAX_VALUE); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.5, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testScaling() { + LOG.info("Testing that the generator weights are monotonically" + + " decreasing regardless of scaling method"); + for (String scaling : new String[] {"linear", "quadratic", "log"}) { + LOG.info("Testing {} scaling...", scaling); + conf.set(LoadBasedGlobalPolicy.DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING, + scaling); + policyGenerator.setConf(conf); + // Test a continuous range for scaling + float prevWeight = 1.01f; + for (int load = 0; load < MAX_PENDING * 2; ++load) { + getMetric(0).setAppsPending(load); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + if (load < MIN_PENDING) { + // Below the minimum load, it should stay 1.0f + assertEquals(1.0f, getWeight(0), DELTA); + } else if (load < MAX_PENDING) { + // In the specified range, the weight should consistently decrease + float weight = getWeight(0); + assertTrue(weight < prevWeight); + prevWeight = weight; + } else { + // Above the maximum load, it should stay 0.0f + assertEquals(0.0f, getWeight(0), DELTA); + } + } + } + } + + private float getWeight(int sc) { + return weights.get(new SubClusterIdInfo(subClusterIds.get(sc))); + } + + private ClusterMetricsInfo getMetric(int sc) { + return clusterMetricsInfos.get(subClusterIds.get(sc)); + } +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java new file mode 100644 index 00000000000..4761652f0e8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONUnmarshaller; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGPolicyFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import javax.xml.bind.JAXBException; +import java.io.IOException; +import java.io.StringReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit test for GPG Policy Generator. + */ +public class TestPolicyGenerator { + + private static final int NUM_SC = 3; + + private Configuration conf; + private FederationStateStore stateStore; + private FederationStateStoreFacade facade = + FederationStateStoreFacade.getInstance(); + + private List subClusterIds; + private Map subClusterInfos; + private Map> clusterInfos; + private Map schedulerInfos; + + private GPGContext gpgContext; + + private PolicyGenerator policyGenerator; + + public TestPolicyGenerator() { + conf = new Configuration(); + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + + gpgContext = new GPGContextImpl(); + gpgContext.setPolicyFacade(new GPGPolicyFacade(facade, conf)); + gpgContext.setStateStoreFacade(facade); + } + + @Before + public void setUp() throws IOException, YarnException, JAXBException { + subClusterIds = new ArrayList<>(); + subClusterInfos = new HashMap<>(); + clusterInfos = new HashMap<>(); + schedulerInfos = new HashMap<>(); + + CapacitySchedulerInfo sti1 = + readJSON("src/test/resources/schedulerInfo1.json", + CapacitySchedulerInfo.class); + CapacitySchedulerInfo sti2 = + readJSON("src/test/resources/schedulerInfo2.json", + CapacitySchedulerInfo.class); + + // Set up sub clusters + for (int i = 0; i < NUM_SC; ++i) { + // Sub cluster Id + SubClusterId id = SubClusterId.newInstance("sc" + i); + subClusterIds.add(id); + + // Sub cluster info + SubClusterInfo cluster = SubClusterInfo + .newInstance(id, "amrm:" + i, "clientrm:" + i, "rmadmin:" + i, + "rmweb:" + i, SubClusterState.SC_RUNNING, 0, ""); + subClusterInfos.put(id, cluster); + + // Cluster metrics info + ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo(); + metricsInfo.setAppsPending(2000); + if (!clusterInfos.containsKey(id)) { + clusterInfos.put(id, new HashMap()); + } + clusterInfos.get(id).put(ClusterMetricsInfo.class, metricsInfo); + + schedulerInfos.put(id, sti1); + } + + // Change one of the sub cluster schedulers + schedulerInfos.put(subClusterIds.get(0), sti2); + + stateStore = mock(FederationStateStore.class); + when(stateStore.getSubClusters((GetSubClustersInfoRequest) any())) + .thenReturn(GetSubClustersInfoResponse.newInstance( + new ArrayList(subClusterInfos.values()))); + facade.reinitialize(stateStore, conf); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + private T readJSON(String pathname, Class classy) + throws IOException, JAXBException { + + JSONJAXBContext jc = + new JSONJAXBContext(JSONConfiguration.mapped().build(), classy); + JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller(); + String contents = new String(Files.readAllBytes(Paths.get(pathname))); + return unmarshaller.unmarshalFromJSON(new StringReader(contents), classy); + + } + + @Test + public void testPolicyGenerator() throws YarnException { + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default", clusterInfos, null); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default2", clusterInfos, null); + } + + @Test + public void testBlacklist() throws YarnException { + conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST, + subClusterIds.get(0).toString()); + Map> blacklistedCMI = + new HashMap<>(clusterInfos); + blacklistedCMI.remove(subClusterIds.get(0)); + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default", blacklistedCMI, null); + verify(policyGenerator.policy, times(0)) + .updatePolicy("default", clusterInfos, null); + } + + @Test + public void testBlacklistTwo() throws YarnException { + conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST, + subClusterIds.get(0).toString() + "," + subClusterIds.get(1) + .toString()); + Map> blacklistedCMI = + new HashMap<>(clusterInfos); + blacklistedCMI.remove(subClusterIds.get(0)); + blacklistedCMI.remove(subClusterIds.get(1)); + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + verify(policyGenerator.policy, times(1)) + .updatePolicy("default", blacklistedCMI, null); + verify(policyGenerator.policy, times(0)) + .updatePolicy("default", clusterInfos, null); + } + + @Test + public void testExistingPolicy() throws YarnException { + WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager(); + // Add a test policy for test queue + manager.setQueue("default"); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(GPGUtils + .createUniformWeights(new HashSet(subClusterIds))); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(GPGUtils + .createUniformWeights(new HashSet(subClusterIds))); + SubClusterPolicyConfiguration testConf = manager.serializeConf(); + when(stateStore.getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest.newInstance("default"))) + .thenReturn( + GetSubClusterPolicyConfigurationResponse.newInstance(testConf)); + + policyGenerator = new TestablePolicyGenerator(); + policyGenerator.policy = mock(GlobalPolicy.class); + policyGenerator.run(); + + ArgumentCaptor argCaptor = + ArgumentCaptor.forClass(FederationPolicyManager.class); + verify(policyGenerator.policy, times(1)) + .updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture()); + assertEquals(argCaptor.getValue().getClass(), manager.getClass()); + assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf()); + } + + @Test + public void testCallRM() { + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + + final String a = CapacitySchedulerConfiguration.ROOT + ".a"; + final String b = CapacitySchedulerConfiguration.ROOT + ".b"; + final String a1 = a + ".a1"; + final String a2 = a + ".a2"; + final String b1 = b + ".b1"; + final String b2 = b + ".b2"; + final String b3 = b + ".b3"; + float aCapacity = 10.5f; + float bCapacity = 89.5f; + float a1Capacity = 30; + float a2Capacity = 70; + float b1Capacity = 79.2f; + float b2Capacity = 0.8f; + float b3Capacity = 20; + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + + csConf.setCapacity(a, aCapacity); + csConf.setCapacity(b, bCapacity); + + // Define 2nd-level queues + csConf.setQueues(a, new String[] {"a1", "a2"}); + csConf.setCapacity(a1, a1Capacity); + csConf.setUserLimitFactor(a1, 100.0f); + csConf.setCapacity(a2, a2Capacity); + csConf.setUserLimitFactor(a2, 100.0f); + + csConf.setQueues(b, new String[] {"b1", "b2", "b3"}); + csConf.setCapacity(b1, b1Capacity); + csConf.setUserLimitFactor(b1, 100.0f); + csConf.setCapacity(b2, b2Capacity); + csConf.setUserLimitFactor(b2, 100.0f); + csConf.setCapacity(b3, b3Capacity); + csConf.setUserLimitFactor(b3, 100.0f); + + YarnConfiguration rmConf = new YarnConfiguration(csConf); + + ResourceManager resourceManager = new ResourceManager(); + rmConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(rmConf); + resourceManager.start(); + + String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf); + SchedulerTypeInfo sti = GPGUtils + .invokeRMWebService(conf, rmAddress, RMWSConsts.SCHEDULER, + SchedulerTypeInfo.class); + + Assert.assertNotNull(sti); + } + + /** + * Testable policy generator overrides the methods that communicate + * with the RM REST endpoint, allowing us to inject faked responses. + */ + class TestablePolicyGenerator extends PolicyGenerator { + + TestablePolicyGenerator() { + super(conf, gpgContext); + } + + @Override + protected Map> getInfos( + Map activeSubClusters) { + Map> ret = new HashMap<>(); + for (SubClusterId id : activeSubClusters.keySet()) { + if (!ret.containsKey(id)) { + ret.put(id, new HashMap()); + } + ret.get(id).put(ClusterMetricsInfo.class, + clusterInfos.get(id).get(ClusterMetricsInfo.class)); + } + return ret; + } + + @Override + protected Map getSchedulerInfo( + Map activeSubClusters) { + Map ret = + new HashMap(); + for (SubClusterId id : activeSubClusters.keySet()) { + ret.put(id, schedulerInfos.get(id)); + } + return ret; + } + } +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json new file mode 100644 index 00000000000..3ad45945f96 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json @@ -0,0 +1,134 @@ +{ + "capacity": 100.0, + "usedCapacity": 0.0, + "maxCapacity": 100.0, + "queueName": "root", + "queues": { + "queue": [ + { + "type": "capacitySchedulerLeafQueueInfo", + "capacity": 100.0, + "usedCapacity": 0.0, + "maxCapacity": 100.0, + "absoluteCapacity": 100.0, + "absoluteMaxCapacity": 100.0, + "absoluteUsedCapacity": 0.0, + "numApplications": 484, + "queueName": "default", + "state": "RUNNING", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "hideReservationQueues": false, + "nodeLabels": [ + "*" + ], + "numActiveApplications": 484, + "numPendingApplications": 0, + "numContainers": 0, + "maxApplications": 10000, + "maxApplicationsPerUser": 10000, + "userLimit": 100, + "users": { + "user": [ + { + "username": "Default", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "numPendingApplications": 0, + "numActiveApplications": 468, + "AMResourceUsed": { + "memory": 30191616, + "vCores": 468 + }, + "userResourceLimit": { + "memory": 31490048, + "vCores": 7612 + } + } + ] + }, + "userLimitFactor": 1.0, + "AMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "usedAMResource": { + "memory": 30388224, + "vCores": 532 + }, + "userAMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "preemptionDisabled": true + } + ] + }, + "health": { + "lastrun": 1517951638085, + "operationsInfo": { + "entry": { + "key": "last-allocation", + "value": { + "nodeId": "node0:0", + "containerId": "container_e61477_1517922128312_0340_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-reservation", + "value": { + "nodeId": "node0:1", + "containerId": "container_e61477_1517879828320_0249_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-release", + "value": { + "nodeId": "node0:2", + "containerId": "container_e61477_1517922128312_0340_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-preemption", + "value": { + "nodeId": "N/A", + "containerId": "N/A", + "queue": "N/A" + } + } + }, + "lastRunDetails": [ + { + "operation": "releases", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + }, + { + "operation": "allocations", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + }, + { + "operation": "reservations", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + } + ] + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json new file mode 100644 index 00000000000..069220ae7ef --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json @@ -0,0 +1,197 @@ + { + "type": "capacityScheduler", + "capacity": 100.0, + "usedCapacity": 0.0, + "maxCapacity": 100.0, + "queueName": "root", + "queues": { + "queue": [ + { + "type": "capacitySchedulerLeafQueueInfo", + "capacity": 100.0, + "usedCapacity": 0.0, + "maxCapacity": 100.0, + "absoluteCapacity": 100.0, + "absoluteMaxCapacity": 100.0, + "absoluteUsedCapacity": 0.0, + "numApplications": 484, + "queueName": "default", + "state": "RUNNING", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "hideReservationQueues": false, + "nodeLabels": [ + "*" + ], + "numActiveApplications": 484, + "numPendingApplications": 0, + "numContainers": 0, + "maxApplications": 10000, + "maxApplicationsPerUser": 10000, + "userLimit": 100, + "users": { + "user": [ + { + "username": "Default", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "numPendingApplications": 0, + "numActiveApplications": 468, + "AMResourceUsed": { + "memory": 30191616, + "vCores": 468 + }, + "userResourceLimit": { + "memory": 31490048, + "vCores": 7612 + } + } + ] + }, + "userLimitFactor": 1.0, + "AMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "usedAMResource": { + "memory": 30388224, + "vCores": 532 + }, + "userAMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "preemptionDisabled": true + }, + { + "type": "capacitySchedulerLeafQueueInfo", + "capacity": 100.0, + "usedCapacity": 0.0, + "maxCapacity": 100.0, + "absoluteCapacity": 100.0, + "absoluteMaxCapacity": 100.0, + "absoluteUsedCapacity": 0.0, + "numApplications": 484, + "queueName": "default2", + "state": "RUNNING", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "hideReservationQueues": false, + "nodeLabels": [ + "*" + ], + "numActiveApplications": 484, + "numPendingApplications": 0, + "numContainers": 0, + "maxApplications": 10000, + "maxApplicationsPerUser": 10000, + "userLimit": 100, + "users": { + "user": [ + { + "username": "Default", + "resourcesUsed": { + "memory": 0, + "vCores": 0 + }, + "numPendingApplications": 0, + "numActiveApplications": 468, + "AMResourceUsed": { + "memory": 30191616, + "vCores": 468 + }, + "userResourceLimit": { + "memory": 31490048, + "vCores": 7612 + } + } + ] + }, + "userLimitFactor": 1.0, + "AMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "usedAMResource": { + "memory": 30388224, + "vCores": 532 + }, + "userAMResourceLimit": { + "memory": 31490048, + "vCores": 7612 + }, + "preemptionDisabled": true + } + ] + }, + "health": { + "lastrun": 1517951638085, + "operationsInfo": { + "entry": { + "key": "last-allocation", + "value": { + "nodeId": "node0:0", + "containerId": "container_e61477_1517922128312_0340_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-reservation", + "value": { + "nodeId": "node0:1", + "containerId": "container_e61477_1517879828320_0249_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-release", + "value": { + "nodeId": "node0:2", + "containerId": "container_e61477_1517922128312_0340_01_000001", + "queue": "root.default" + } + }, + "entry": { + "key": "last-preemption", + "value": { + "nodeId": "N/A", + "containerId": "N/A", + "queue": "N/A" + } + } + }, + "lastRunDetails": [ + { + "operation": "releases", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + }, + { + "operation": "allocations", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + }, + { + "operation": "reservations", + "count": 0, + "resources": { + "memory": 0, + "vCores": 0 + } + } + ] + } + } +