diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index eabe413872a..61c3cd15567 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3160,7 +3160,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false;
- private static final String FEDERATION_GPG_PREFIX =
+ public static final String FEDERATION_GPG_PREFIX =
FEDERATION_PREFIX + "gpg.";
// The number of threads to use for the GPG scheduled executor service
@@ -3178,6 +3178,38 @@ public static boolean isAclEnabled(Configuration conf) {
FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
+ /** The interval at which the policy generator runs, default is one hour. */
+ public static final String GPG_POLICY_GENERATOR_INTERVAL_MS =
+ FEDERATION_GPG_PREFIX + "policy.generator.interval-ms";
+ public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS =
+ 3600000;
+
+ /**
+ * The configured policy generator class, runs DefaultGlobalPolicy by
+ * default.
+ */
+ public static final String GPG_GLOBAL_POLICY_CLASS =
+ FEDERATION_GPG_PREFIX + "policy.generator.class";
+ public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS =
+ "org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator."
+ + "DefaultGlobalPolicy";
+
+ /**
+ * Whether or not the policy generator is running in read only (won't modify
+ * policies), default is false.
+ */
+ public static final String GPG_POLICY_GENERATOR_READONLY =
+ FEDERATION_GPG_PREFIX + "policy.generator.readonly";
+ public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY =
+ false;
+
+ /**
+ * Which sub-clusters the policy generator should blacklist.
+ */
+ public static final String GPG_POLICY_GENERATOR_BLACKLIST =
+ FEDERATION_GPG_PREFIX + "policy.generator.blacklist";
+
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 899c210b390..3168d2199fe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3425,6 +3425,46 @@
1800000
+
+
+ The interval at which the policy generator runs, default is one hour
+
+ yarn.federation.gpg.policy.generator.interval-ms
+ 3600000
+
+
+
+
+ The configured policy generator class, runs DefaultGlobalPolicy by default
+
+ yarn.federation.gpg.policy.generator.class
+ org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.DefaultGlobalPolicy
+
+
+
+
+ Whether or not the policy generator is running in read only (won't modify policies), default is false
+
+ yarn.federation.gpg.policy.generator.readonly
+ false
+
+
+
+
+ Whether or not the policy generator is running in read only (won't modify policies), default is false
+
+ yarn.federation.gpg.policy.generator.readonly
+ false
+
+
+
+
+ Which subclusters the gpg should blacklist, default is none
+
+ yarn.federation.gpg.policy.generator.blacklist
+
+
+
It is TimelineClient 1.5 configuration whether to store active
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index ef7711412de..e8ab8b8c598 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -372,6 +373,18 @@ public SubClusterPolicyConfiguration getPolicyConfiguration(
}
}
+ /**
+ * Set a policy configuration into the state store.
+ *
+ * @param policyConf the policy configuration to set
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
+ throws YarnException {
+ stateStore.setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
+ }
+
/**
* Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
index 9bbb936e7b9..e83402c67a1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
@@ -61,6 +61,12 @@
test
+
+ org.apache.hadoop
+ hadoop-yarn-server-timelineservice
+ provided
+
+
org.apache.hadoop
hadoop-yarn-server-resourcemanager
@@ -72,6 +78,12 @@
test
+
+ org.mockito
+ mockito-all
+ test
+
+
org.apache.hadoop
hadoop-yarn-server-common
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
index da8a383bd75..6b0a5a43112 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
@@ -28,4 +28,8 @@
FederationStateStoreFacade getStateStoreFacade();
void setStateStoreFacade(FederationStateStoreFacade facade);
+
+ GPGPolicyFacade getPolicyFacade();
+
+ void setPolicyFacade(GPGPolicyFacade facade);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
index 3884ace9cef..bb498448fae 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
@@ -26,6 +26,7 @@
public class GPGContextImpl implements GPGContext {
private FederationStateStoreFacade facade;
+ private GPGPolicyFacade policyFacade;
@Override
public FederationStateStoreFacade getStateStoreFacade() {
@@ -38,4 +39,13 @@ public void setStateStoreFacade(
this.facade = federationStateStoreFacade;
}
+ @Override
+ public GPGPolicyFacade getPolicyFacade(){
+ return policyFacade;
+ }
+
+ @Override
+ public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
+ policyFacade = gpgPolicyfacade;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
new file mode 100644
index 00000000000..90391cd61cc
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for the GPG Policy Generator to read and write policies
+ * into the state store. Policy specific logic is abstracted away in this
+ * class, so the PolicyGenerator can avoid dealing with policy construction,
+ * reinitialization, and serialization.
+ *
+ * There are only two exposed methods:
+ *
+ * {@link #getPolicyManager(String)}
+ * Gets the PolicyManager via queue name. Null if there is no policy
+ * configured for the specified queue. The PolicyManager can be used to
+ * extract the {@link FederationRouterPolicy} and
+ * {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
+ *
+ * {@link #setPolicyManager(FederationPolicyManager)}
+ * Sets the PolicyManager. If the policy configuration is the same, no change
+ * occurs. Otherwise, the internal cache is updated and the new configuration
+ * is written into the state store
+ *
+ * This class assumes that the GPG is the only service
+ * writing policies. Thus, the only state store reads occur the first time a
+ * queue policy is retrieved - after that, the GPG only writes to the state
+ * store.
+ *
+ * The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
+ * cache. The primary use for these caches are to serve reads, and to
+ * identify when the PolicyGenerator has actually changed the policy
+ * so unnecessary state store policy writes can be avoided.
+ */
+
+public class GPGPolicyFacade {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GPGPolicyFacade.class);
+
+ private FederationStateStoreFacade stateStore;
+
+ private Map policyManagerMap;
+ private Map policyConfMap;
+
+ private boolean readOnly;
+
+ public GPGPolicyFacade(FederationStateStoreFacade stateStore,
+ Configuration conf) {
+ this.stateStore = stateStore;
+ this.policyManagerMap = new HashMap<>();
+ this.policyConfMap = new HashMap<>();
+ this.readOnly =
+ conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
+ YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
+ }
+
+ /**
+ * Provides a utility for the policy generator to read the policy manager
+ * from the state store. Because the policy generator should be the only
+ * component updating the policy, this implementation does not use the
+ * reinitialization feature.
+ *
+ * @param queueName the name of the queue we want the policy manager for.
+ * @return the policy manager responsible for the queue policy.
+ */
+ public FederationPolicyManager getPolicyManager(String queueName)
+ throws YarnException {
+ FederationPolicyManager policyManager = policyManagerMap.get(queueName);
+ // If we don't have the policy manager cached, pull configuration
+ // from the state store to create and cache it
+ if (policyManager == null) {
+ try {
+ // If we don't have the configuration cached, pull it
+ // from the stateStore
+ SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);
+ if (conf == null) {
+ conf = stateStore.getPolicyConfiguration(queueName);
+ }
+ // If configuration is still null, it does not exist in the state store
+ if (conf == null) {
+ LOG.info("Read null policy for queue {}", queueName);
+ return null;
+ }
+ policyManager =
+ FederationPolicyUtils.instantiatePolicyManager(conf.getType());
+ policyManager.setQueue(queueName);
+
+ // TODO there is currently no way to cleanly deserialize a policy
+ // manager sub type from just the configuration
+ if (policyManager instanceof WeightedLocalityPolicyManager) {
+ WeightedPolicyInfo wpinfo =
+ WeightedPolicyInfo.fromByteBuffer(conf.getParams());
+ WeightedLocalityPolicyManager wlpmanager =
+ (WeightedLocalityPolicyManager) policyManager;
+ LOG.info("Updating policy for queue {} to configured weights router: "
+ + "{}, amrmproxy: {}", queueName,
+ wpinfo.getRouterPolicyWeights(),
+ wpinfo.getAMRMPolicyWeights());
+ wlpmanager.setWeightedPolicyInfo(wpinfo);
+ } else {
+ LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
+ + "initialization may be incomplete ", policyManager.getClass());
+ }
+
+ policyManagerMap.put(queueName, policyManager);
+ policyConfMap.put(queueName, conf);
+ } catch (YarnException e) {
+ LOG.error("Error reading SubClusterPolicyConfiguration from state "
+ + "store for queue: {}", queueName);
+ throw e;
+ }
+ }
+ return policyManager;
+ }
+
+ /**
+ * Provides a utility for the policy generator to write a policy manager
+ * into the state store. The facade keeps a cache and will only write into
+ * the state store if the policy configuration has changed.
+ *
+ * @param policyManager The policy manager we want to update into the state
+ * store. It contains policy information as well as
+ * the queue name we will update for.
+ */
+ public void setPolicyManager(FederationPolicyManager policyManager)
+ throws YarnException {
+ if (policyManager == null) {
+ LOG.warn("Attempting to set null policy manager");
+ return;
+ }
+ // Extract the configuration from the policy manager
+ String queue = policyManager.getQueue();
+ SubClusterPolicyConfiguration conf;
+ try {
+ conf = policyManager.serializeConf();
+ } catch (FederationPolicyInitializationException e) {
+ LOG.warn("Error serializing policy for queue {}", queue);
+ throw e;
+ }
+ if (conf == null) {
+ // State store does not currently support setting a policy back to null
+ // because it reads the queue name to set from the policy!
+ LOG.warn("Skip setting policy to null for queue {} into state store",
+ queue);
+ return;
+ }
+ // Compare with configuration cache, if different, write the conf into
+ // store and update our conf and manager cache
+ if (!confCacheEqual(queue, conf)) {
+ try {
+ if (readOnly) {
+ LOG.info("[read-only] Skipping policy update for queue {}", queue);
+ return;
+ }
+ LOG.info("Updating policy for queue {} into state store", queue);
+ stateStore.setPolicyConfiguration(conf);
+ policyConfMap.put(queue, conf);
+ policyManagerMap.put(queue, policyManager);
+ } catch (YarnException e) {
+ LOG.warn("Error writing SubClusterPolicyConfiguration to state "
+ + "store for queue: {}", queue);
+ throw e;
+ }
+ } else {
+ LOG.info("Setting unchanged policy - state store write skipped");
+ }
+ }
+
+ /**
+ * @param queue
+ * @param conf
+ * @return whether or not the conf is equal to the cached conf
+ */
+ private boolean confCacheEqual(String queue,
+ SubClusterPolicyConfiguration conf) {
+ SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
+ if (conf == null && cachedConf == null) {
+ return true;
+ } else if (conf != null && cachedConf != null) {
+ if (conf.equals(cachedConf)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
new file mode 100644
index 00000000000..70e3588ac3c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * GPGUtils contains utility functions for the GPG.
+ *
+ */
+public final class GPGUtils {
+
+ // hide constructor
+ private GPGUtils() {
+ }
+
+ /**
+ * Performs an invocation of the the remote RMWebService.
+ */
+ public static T invokeRMWebService(Configuration conf, String webAddr,
+ String path, final Class returnType) {
+ Client client = Client.create();
+ T obj = null;
+
+ WebResource webResource = client.resource(webAddr);
+ ClientResponse response = webResource.path("ws/v1/cluster").path(path)
+ .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+ if (response.getStatus() == HttpServletResponse.SC_OK) {
+ obj = response.getEntity(returnType);
+ } else {
+ throw new YarnRuntimeException("Bad response from remote web service: "
+ + response.getStatus());
+ }
+ return obj;
+ }
+
+ /**
+ * Creates a default weighting.
+ */
+ public static Map createUniformWeights(
+ Set ids) {
+ Map weights =
+ new HashMap<>();
+ for(SubClusterId id : ids) {
+ weights.put(new SubClusterIdInfo(id), 1.0f);
+ }
+ return weights;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index f6cfba027db..f9d84a5680c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGeneratorService;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +59,7 @@
// Federation Variables
private GPGContext gpgContext;
+ private PolicyGeneratorService policyGeneratorService;
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
@@ -73,6 +75,12 @@ protected void serviceInit(Configuration conf) throws Exception {
// Set up the context
this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance());
+ this.gpgContext
+ .setPolicyFacade(new GPGPolicyFacade(
+ this.gpgContext.getStateStoreFacade(), conf));
+
+ this.policyGeneratorService = new PolicyGeneratorService(this.gpgContext);
+ addService(this.policyGeneratorService);
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/DefaultGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/DefaultGlobalPolicy.java
new file mode 100644
index 00000000000..6a3446e8089
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/DefaultGlobalPolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Default policy that does nothing.
+ */
+public class DefaultGlobalPolicy extends GlobalPolicy{
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultGlobalPolicy.class);
+
+ @Override
+ public FederationPolicyManager updatePolicy(String queueName,
+ Map> clusterInfo,
+ FederationPolicyManager manager) {
+ LOG.info("Updating policy for queue {}", queueName);
+ return null;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
new file mode 100644
index 00000000000..e698e872f60
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.Map;
+
+/**
+ * This interface defines the plug-able policy that the PolicyGenerator uses
+ * to update policies into the state store.
+ */
+
+public abstract class GlobalPolicy implements Configurable {
+
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Given a map of class type to string, write into the map the object type
+ * and RM path to request it from - the framework will query these paths
+ * and provide the objects to the policy. Delegating this responsibility to
+ * the PolicyGenerator enables us avoid duplicate calls to the same
+ * endpoints as the GlobalPolicy is invoked once per queue.
+ */
+ protected void registerPaths(Map paths) {
+ // Default register nothing
+ }
+
+ /**
+ * Given a queue, cluster metrics, and policy manager, update the policy
+ * to account for the cluster status. This method defines the policy generator
+ * behavior.
+ *
+ * @param queueName name of the queue
+ * @param clusterInfo subClusterId map to cluster information about the
+ * sub cluster used to make policy decisions
+ * @param manager the FederationPolicyManager for the queue's existing
+ * policy the manager may be null, in which case the policy
+ * will need to be created
+ * @return policy manager that handles the updated (or created) policy
+ */
+ protected abstract FederationPolicyManager updatePolicy(String queueName,
+ Map> clusterInfo,
+ FederationPolicyManager manager);
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
new file mode 100644
index 00000000000..ec1883bab97
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The PolicyGenerator runs periodically and updates the policy configuration
+ * for each queue into the state store. This is a plug-able component -
+ * subclasses should override createPolicy and updatePolicy to define
+ * specific behavior.
+ */
+
+public class PolicyGenerator implements Runnable, Configurable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PolicyGenerator.class);
+
+ private GPGContext gpgContext;
+ private Configuration conf;
+
+ // Information request map
+ private Map pathMap = new HashMap<>();
+
+ // Global policy instance
+ @VisibleForTesting
+ protected GlobalPolicy policy;
+
+ /**
+ * The PolicyGenerator periodically reads sub cluster load and updates
+ * policies into the state store.
+ */
+ public PolicyGenerator() {
+ }
+
+ public void init(GPGContext context) {
+ this.gpgContext = context;
+ LOG.info("Initialized PolicyGenerator");
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.policy = FederationStateStoreFacade
+ .createInstance(conf, YarnConfiguration.GPG_GLOBAL_POLICY_CLASS,
+ YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS,
+ GlobalPolicy.class);
+ policy.setConf(conf);
+ policy.registerPaths(pathMap);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public final void run() {
+ Map activeSubClusters;
+ try {
+ activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true);
+ } catch (YarnException e) {
+ LOG.error("Error retrieving active sub-clusters", e);
+ return;
+ }
+
+ // Parse the scheduler information from all the SCs
+ Map schedInfo =
+ getSchedulerInfo(activeSubClusters);
+
+ // Extract and enforce that all the schedulers have matching type
+ Set queueNames = extractQueues(schedInfo);
+
+ // Remove black listed sub clusters
+ activeSubClusters.keySet().removeAll(getBlackList());
+ LOG.info("Active non-blacklist sub-clusters: {}",
+ activeSubClusters.keySet());
+
+ // Get cluster metrics information from non black listed RMs - later used
+ // to evaluate sub cluster load
+ Map> clusterInfo =
+ getInfos(activeSubClusters);
+
+ // Update into the federation state store
+ for (String queueName : queueNames) {
+ // Retrieve the manager from the policy facade
+ FederationPolicyManager manager;
+ try {
+ manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
+ } catch (YarnException e) {
+ LOG.error("GetPolicy for queue {} failed", queueName, e);
+ continue;
+ }
+ LOG.info("Updating policy for queue {}", queueName);
+ manager = policy.updatePolicy(queueName, clusterInfo, manager);
+ try {
+ this.gpgContext.getPolicyFacade().setPolicyManager(manager);
+ } catch (YarnException e) {
+ LOG.error("SetPolicy for queue {} failed", queueName, e);
+ }
+ }
+ }
+
+ /**
+ * Helper to retrieve metrics from the RM REST endpoints.
+ *
+ * @param activeSubClusters A map of active sub cluster IDs to info
+ */
+ @VisibleForTesting
+ protected Map> getInfos(
+ Map activeSubClusters) {
+
+ Map> clusterInfo = new HashMap<>();
+ for (SubClusterInfo sci : activeSubClusters.values()) {
+ for (Map.Entry e : this.pathMap.entrySet()) {
+ if (!clusterInfo.containsKey(sci.getSubClusterId())) {
+ clusterInfo.put(sci.getSubClusterId(), new HashMap());
+ }
+ Object ret = GPGUtils
+ .invokeRMWebService(conf, sci.getRMWebServiceAddress(),
+ e.getValue(), e.getKey());
+ clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
+ }
+ }
+
+ return clusterInfo;
+ }
+
+ /**
+ * Helper to retrieve SchedulerInfos.
+ *
+ * @param activeSubClusters A map of active sub cluster IDs to info
+ */
+ @VisibleForTesting
+ protected Map getSchedulerInfo(
+ Map activeSubClusters) {
+ Map schedInfo =
+ new HashMap<>();
+ for (SubClusterInfo sci : activeSubClusters.values()) {
+ // TODO parallel
+ SchedulerTypeInfo sti = GPGUtils
+ .invokeRMWebService(conf, sci.getRMWebServiceAddress(),
+ RMWSConsts.SCHEDULER, SchedulerTypeInfo.class);
+ schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
+ }
+ return schedInfo;
+ }
+
+ /**
+ * Helper to get a set of blacklisted sub cluster Ids from configuration.
+ */
+ private Set getBlackList() {
+ String blackListParam =
+ conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST, "");
+ Set blackList = new HashSet<>();
+ for (String id : blackListParam.split(",")) {
+ blackList.add(SubClusterId.newInstance(id));
+ }
+ return blackList;
+ }
+
+ /**
+ * Given the scheduler information for all RMs, extract the union of
+ * queue names - right now we only consider instances of capacity scheduler.
+ *
+ * @param schedInfo the scheduler information
+ * @return
+ */
+ private Set extractQueues(
+ Map schedInfo) {
+ Set queueNames = new HashSet();
+ for (Map.Entry entry : schedInfo.entrySet()) {
+ // TODO workaround - eventually the RMs should write
+ // queue structure into the state store.
+ if (entry.getValue() instanceof CapacitySchedulerInfo) {
+ // Flatten the queue structure and get only non leaf queues
+ queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue())
+ .get(CapacitySchedulerQueueInfo.class));
+ } else {
+ LOG.warn("Skipping sub cluster {}, not configured with capacity "
+ + "scheduler", entry.getKey());
+ }
+ }
+ return queueNames;
+ }
+
+ // Helpers to flatten the queue structure into a multimap of
+ // queue type to set of queue names
+ private Map> flattenQueue(CapacitySchedulerInfo csi) {
+ Map> flattened = new HashMap>();
+ addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+ for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
+ flattenQueue(csqi, flattened);
+ }
+ return flattened;
+ }
+
+ private void flattenQueue(CapacitySchedulerQueueInfo csi,
+ Map> flattened) {
+ addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+ if (csi.getQueues() != null) {
+ for (CapacitySchedulerQueueInfo csqi : csi.getQueues()
+ .getQueueInfoList()) {
+ flattenQueue(csqi, flattened);
+ }
+ }
+ }
+
+ private void addOrAppend(Map> multimap, K key, V value) {
+ if (!multimap.containsKey(key)) {
+ multimap.put(key, new HashSet());
+ }
+ multimap.get(key).add(value);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGeneratorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGeneratorService.java
new file mode 100644
index 00000000000..9dbd0418417
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGeneratorService.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The policy generator is one of the GPG's services that periodically checks
+ * sub cluster load and writes updated policies into the state store to load
+ * balance the cluster.
+ */
+public class PolicyGeneratorService extends AbstractService {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(PolicyGeneratorService.class);
+
+ private GPGContext gpgContext;
+ private long generatorIntervalMillis;
+ private PolicyGenerator policyGenerator;
+ private ScheduledExecutorService scheduledExecutorService;
+
+ public PolicyGeneratorService(GPGContext gpgContext) {
+ super(PolicyGeneratorService.class.getName());
+ this.gpgContext = gpgContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ // Load parameters from the configuration
+ this.generatorIntervalMillis = conf.getLong(
+ YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS);
+
+ // Set up the runnable
+ this.policyGenerator = new PolicyGenerator();
+ this.policyGenerator.init(gpgContext);
+ this.policyGenerator.setConf(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+
+ // Begin the policy generator scheduled execution
+ this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator,
+ 0, this.generatorIntervalMillis, TimeUnit.MILLISECONDS);
+ LOG.info("Started federation policy generator with interval: "
+ + DurationFormatUtils.formatDurationISO(this.generatorIntervalMillis));
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ // Attempt to stop the policy generator scheduled execution
+ try {
+ if (this.scheduledExecutorService != null
+ && !this.scheduledExecutorService.isShutdown()) {
+ this.scheduledExecutorService.shutdown();
+ LOG.info("Stopped federation policy generator");
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to shutdown ScheduledExecutorService", e);
+ throw e;
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
new file mode 100644
index 00000000000..a8059f8f975
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.commons.math3.optim.nonlinear.vector.Weight;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Simple policy that generates and updates uniform weighted locality
+ * policies.
+ */
+public class UniformWeightedLocalityGlobalPolicy extends GlobalPolicy{
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(UniformWeightedLocalityGlobalPolicy.class);
+
+ @Override
+ protected FederationPolicyManager updatePolicy(String queueName,
+ Map> clusterInfo,
+ FederationPolicyManager currentManager){
+ if(currentManager == null){
+ // Set uniform weights for all sub clusters
+ LOG.info("Creating uniform weighted policy queue {}", queueName);
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(queueName);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(clusterInfo.keySet()));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(clusterInfo.keySet()));
+ currentManager = manager;
+ }
+ if(currentManager instanceof WeightedLocalityPolicyManager){
+ LOG.info("Updating policy for queue {} to default weights", queueName);
+ WeightedLocalityPolicyManager wlpmanager =
+ (WeightedLocalityPolicyManager) currentManager;
+ wlpmanager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(clusterInfo.keySet()));
+ wlpmanager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(clusterInfo.keySet()));
+ } else {
+ LOG.info("Policy for queue {} is of type {}, expected {}",
+ queueName, currentManager.getClass(), Weight.class);
+ }
+ return currentManager;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
new file mode 100644
index 00000000000..ef93c9ad2a0
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
new file mode 100644
index 00000000000..45ee266ee36
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Unit test for GPG Policy Facade.
+ */
+public class TestGPGPolicyFacade {
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+ private GPGPolicyFacade policyFacade;
+
+ private Set subClusterIds;
+
+ private SubClusterPolicyConfiguration testConf;
+
+ private static final String TEST_QUEUE = "test-queue";
+
+ public TestGPGPolicyFacade() {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+ subClusterIds = new HashSet<>();
+ subClusterIds.add(SubClusterId.newInstance("sc0"));
+ subClusterIds.add(SubClusterId.newInstance("sc1"));
+ subClusterIds.add(SubClusterId.newInstance("sc2"));
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException {
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+ facade.reinitialize(stateStore, conf);
+ policyFacade = new GPGPolicyFacade(facade, conf);
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ testConf = manager.serializeConf();
+ stateStore.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+ .newInstance(testConf));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ @Test
+ public void testGetPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE);
+ Assert.assertEquals(testConf, manager.serializeConf());
+ }
+
+ @Test
+ public void testSetNewPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(TEST_QUEUE + 0);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+ policyFacade.setPolicyManager(manager);
+
+ manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE + 0);
+ Assert.assertEquals(policyConf, manager.serializeConf());
+ }
+
+ @Test
+ public void testOverwritePolicy() throws YarnException {
+ subClusterIds.add(SubClusterId.newInstance("sc3"));
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+ policyFacade.setPolicyManager(manager);
+
+ manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE);
+ Assert.assertEquals(policyConf, manager.serializeConf());
+ }
+
+ @Test
+ public void testWriteCache() throws YarnException {
+ stateStore = mock(MemoryFederationStateStore.class);
+ facade.reinitialize(stateStore, conf);
+ when(stateStore.getPolicyConfiguration(Matchers.any(
+ GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+ policyFacade = new GPGPolicyFacade(facade, conf);
+
+ // Query once to fill the cache
+ FederationPolicyManager manager = policyFacade.getPolicyManager(TEST_QUEUE);
+ // State store should be contacted once
+ verify(stateStore, times(1)).getPolicyConfiguration(
+ Matchers.any(GetSubClusterPolicyConfigurationRequest.class));
+
+ // If we set the same policy, the state store should be untouched
+ policyFacade.setPolicyManager(manager);
+ verify(stateStore, times(0)).setPolicyConfiguration(
+ Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+ }
+
+ @Test
+ public void testReadOnly() throws YarnException {
+ conf.setBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, true);
+ stateStore = mock(MemoryFederationStateStore.class);
+ facade.reinitialize(stateStore, conf);
+ when(stateStore.getPolicyConfiguration(Matchers.any(
+ GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+ policyFacade = new GPGPolicyFacade(facade, conf);
+
+ // If we set a policy, the state store should be untouched
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ policyFacade.setPolicyManager(manager);
+ verify(stateStore, times(0)).setPolicyConfiguration(
+ Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+ }
+
+}
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
new file mode 100644
index 00000000000..fe8903edccd
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGPolicyFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for GPG Policy Generator.
+ */
+public class TestPolicyGenerator {
+
+ private static final int NUM_SC = 3;
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+
+ private List subClusterIds;
+ private Map subClusterInfos;
+ private Map> clusterInfos;
+ private Map schedulerInfos;
+
+ private GPGContext gpgContext;
+
+ private PolicyGenerator policyGenerator;
+
+ public TestPolicyGenerator() {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setPolicyFacade(new GPGPolicyFacade(facade, conf));
+ gpgContext.setStateStoreFacade(facade);
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException, JAXBException {
+ subClusterIds = new ArrayList<>();
+ subClusterInfos = new HashMap<>();
+ clusterInfos = new HashMap<>();
+ schedulerInfos = new HashMap<>();
+
+ CapacitySchedulerInfo sti1 =
+ readJSON("src/test/resources/schedulerInfo1.json",
+ CapacitySchedulerInfo.class);
+ CapacitySchedulerInfo sti2 =
+ readJSON("src/test/resources/schedulerInfo2.json",
+ CapacitySchedulerInfo.class);
+
+ // Set up sub clusters
+ for (int i = 0; i < NUM_SC; ++i) {
+ // Sub cluster Id
+ SubClusterId id = SubClusterId.newInstance("sc" + i);
+ subClusterIds.add(id);
+
+ // Sub cluster info
+ SubClusterInfo cluster = SubClusterInfo
+ .newInstance(id, "amrm:" + i, "clientrm:" + i, "rmadmin:" + i,
+ "rmweb:" + i, SubClusterState.SC_RUNNING, 0, "");
+ subClusterInfos.put(id, cluster);
+
+ // Cluster metrics info
+ ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo();
+ metricsInfo.setAppsPending(2000);
+ if (!clusterInfos.containsKey(id)) {
+ clusterInfos.put(id, new HashMap());
+ }
+ clusterInfos.get(id).put(ClusterMetricsInfo.class, metricsInfo);
+
+ schedulerInfos.put(id, sti1);
+ }
+
+ // Change one of the sub cluster schedulers
+ schedulerInfos.put(subClusterIds.get(0), sti2);
+
+ stateStore = mock(FederationStateStore.class);
+ when(stateStore.getSubClusters((GetSubClustersInfoRequest) any()))
+ .thenReturn(GetSubClustersInfoResponse.newInstance(
+ new ArrayList(subClusterInfos.values())));
+ facade.reinitialize(stateStore, conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ private T readJSON(String pathname, Class classy)
+ throws IOException, JAXBException {
+
+ JSONJAXBContext jc =
+ new JSONJAXBContext(JSONConfiguration.mapped().build(), classy);
+ JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
+ String contents = new String(Files.readAllBytes(Paths.get(pathname)));
+ return unmarshaller.unmarshalFromJSON(new StringReader(contents), classy);
+
+ }
+
+ @Test
+ public void testPolicyGenerator() throws YarnException {
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.init(gpgContext);
+ policyGenerator.setConf(conf);
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default", clusterInfos, null);
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default2", clusterInfos, null);
+ }
+
+ @Test
+ public void testBlacklist() throws YarnException {
+ conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+ subClusterIds.get(0).toString());
+ Map> blacklistedCMI =
+ new HashMap<>(clusterInfos);
+ blacklistedCMI.remove(subClusterIds.get(0));
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.init(gpgContext);
+ policyGenerator.setConf(conf);
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default", blacklistedCMI, null);
+ verify(policyGenerator.policy, times(0))
+ .updatePolicy("default", clusterInfos, null);
+ }
+
+ @Test
+ public void testBlacklistTwo() throws YarnException {
+ conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+ subClusterIds.get(0).toString() + "," + subClusterIds.get(1)
+ .toString());
+ Map> blacklistedCMI =
+ new HashMap<>(clusterInfos);
+ blacklistedCMI.remove(subClusterIds.get(0));
+ blacklistedCMI.remove(subClusterIds.get(1));
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.init(gpgContext);
+ policyGenerator.setConf(conf);
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default", blacklistedCMI, null);
+ verify(policyGenerator.policy, times(0))
+ .updatePolicy("default", clusterInfos, null);
+ }
+
+ @Test
+ public void testExistingPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue("default");
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(GPGUtils
+ .createUniformWeights(new HashSet(subClusterIds)));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(GPGUtils
+ .createUniformWeights(new HashSet(subClusterIds)));
+ SubClusterPolicyConfiguration testConf = manager.serializeConf();
+ when(stateStore.getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest.newInstance("default")))
+ .thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.init(gpgContext);
+ policyGenerator.setConf(conf);
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+
+ ArgumentCaptor argCaptor =
+ ArgumentCaptor.forClass(FederationPolicyManager.class);
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture());
+ assertEquals(argCaptor.getValue().getClass(), manager.getClass());
+ assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf());
+ }
+
+ @Test
+ public void testCallRM() {
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+
+ final String a = CapacitySchedulerConfiguration.ROOT + ".a";
+ final String b = CapacitySchedulerConfiguration.ROOT + ".b";
+ final String a1 = a + ".a1";
+ final String a2 = a + ".a2";
+ final String b1 = b + ".b1";
+ final String b2 = b + ".b2";
+ final String b3 = b + ".b3";
+ float aCapacity = 10.5f;
+ float bCapacity = 89.5f;
+ float a1Capacity = 30;
+ float a2Capacity = 70;
+ float b1Capacity = 79.2f;
+ float b2Capacity = 0.8f;
+ float b3Capacity = 20;
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a", "b"});
+
+ csConf.setCapacity(a, aCapacity);
+ csConf.setCapacity(b, bCapacity);
+
+ // Define 2nd-level queues
+ csConf.setQueues(a, new String[] {"a1", "a2"});
+ csConf.setCapacity(a1, a1Capacity);
+ csConf.setUserLimitFactor(a1, 100.0f);
+ csConf.setCapacity(a2, a2Capacity);
+ csConf.setUserLimitFactor(a2, 100.0f);
+
+ csConf.setQueues(b, new String[] {"b1", "b2", "b3"});
+ csConf.setCapacity(b1, b1Capacity);
+ csConf.setUserLimitFactor(b1, 100.0f);
+ csConf.setCapacity(b2, b2Capacity);
+ csConf.setUserLimitFactor(b2, 100.0f);
+ csConf.setCapacity(b3, b3Capacity);
+ csConf.setUserLimitFactor(b3, 100.0f);
+
+ YarnConfiguration rmConf = new YarnConfiguration(csConf);
+
+ ResourceManager resourceManager = new ResourceManager();
+ rmConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ resourceManager.init(rmConf);
+ resourceManager.start();
+
+ String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
+ SchedulerTypeInfo sti = GPGUtils
+ .invokeRMWebService(conf, rmAddress, RMWSConsts.SCHEDULER,
+ SchedulerTypeInfo.class);
+
+ Assert.assertNotNull(sti);
+ }
+
+ /**
+ * Testable policy generator overrides the methods that communicate
+ * with the RM REST endpoint, allowing us to inject faked responses.
+ */
+ class TestablePolicyGenerator extends PolicyGenerator {
+
+ @Override
+ protected Map> getInfos(
+ Map activeSubClusters) {
+ Map> ret = new HashMap<>();
+ for (SubClusterId id : activeSubClusters.keySet()) {
+ if (!ret.containsKey(id)) {
+ ret.put(id, new HashMap());
+ }
+ ret.get(id).put(ClusterMetricsInfo.class,
+ clusterInfos.get(id).get(ClusterMetricsInfo.class));
+ }
+ return ret;
+ }
+
+ @Override
+ protected Map getSchedulerInfo(
+ Map activeSubClusters) {
+ Map ret =
+ new HashMap();
+ for (SubClusterId id : activeSubClusters.keySet()) {
+ ret.put(id, schedulerInfos.get(id));
+ }
+ return ret;
+ }
+ }
+}
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
new file mode 100644
index 00000000000..3ad45945f96
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
@@ -0,0 +1,134 @@
+{
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "queueName": "root",
+ "queues": {
+ "queue": [
+ {
+ "type": "capacitySchedulerLeafQueueInfo",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "absoluteCapacity": 100.0,
+ "absoluteMaxCapacity": 100.0,
+ "absoluteUsedCapacity": 0.0,
+ "numApplications": 484,
+ "queueName": "default",
+ "state": "RUNNING",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "hideReservationQueues": false,
+ "nodeLabels": [
+ "*"
+ ],
+ "numActiveApplications": 484,
+ "numPendingApplications": 0,
+ "numContainers": 0,
+ "maxApplications": 10000,
+ "maxApplicationsPerUser": 10000,
+ "userLimit": 100,
+ "users": {
+ "user": [
+ {
+ "username": "Default",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "numPendingApplications": 0,
+ "numActiveApplications": 468,
+ "AMResourceUsed": {
+ "memory": 30191616,
+ "vCores": 468
+ },
+ "userResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ }
+ }
+ ]
+ },
+ "userLimitFactor": 1.0,
+ "AMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "usedAMResource": {
+ "memory": 30388224,
+ "vCores": 532
+ },
+ "userAMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "preemptionDisabled": true
+ }
+ ]
+ },
+ "health": {
+ "lastrun": 1517951638085,
+ "operationsInfo": {
+ "entry": {
+ "key": "last-allocation",
+ "value": {
+ "nodeId": "node0:0",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-reservation",
+ "value": {
+ "nodeId": "node0:1",
+ "containerId": "container_e61477_1517879828320_0249_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-release",
+ "value": {
+ "nodeId": "node0:2",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-preemption",
+ "value": {
+ "nodeId": "N/A",
+ "containerId": "N/A",
+ "queue": "N/A"
+ }
+ }
+ },
+ "lastRunDetails": [
+ {
+ "operation": "releases",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "allocations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "reservations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json
new file mode 100644
index 00000000000..069220ae7ef
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json
@@ -0,0 +1,197 @@
+ {
+ "type": "capacityScheduler",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "queueName": "root",
+ "queues": {
+ "queue": [
+ {
+ "type": "capacitySchedulerLeafQueueInfo",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "absoluteCapacity": 100.0,
+ "absoluteMaxCapacity": 100.0,
+ "absoluteUsedCapacity": 0.0,
+ "numApplications": 484,
+ "queueName": "default",
+ "state": "RUNNING",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "hideReservationQueues": false,
+ "nodeLabels": [
+ "*"
+ ],
+ "numActiveApplications": 484,
+ "numPendingApplications": 0,
+ "numContainers": 0,
+ "maxApplications": 10000,
+ "maxApplicationsPerUser": 10000,
+ "userLimit": 100,
+ "users": {
+ "user": [
+ {
+ "username": "Default",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "numPendingApplications": 0,
+ "numActiveApplications": 468,
+ "AMResourceUsed": {
+ "memory": 30191616,
+ "vCores": 468
+ },
+ "userResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ }
+ }
+ ]
+ },
+ "userLimitFactor": 1.0,
+ "AMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "usedAMResource": {
+ "memory": 30388224,
+ "vCores": 532
+ },
+ "userAMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "preemptionDisabled": true
+ },
+ {
+ "type": "capacitySchedulerLeafQueueInfo",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "absoluteCapacity": 100.0,
+ "absoluteMaxCapacity": 100.0,
+ "absoluteUsedCapacity": 0.0,
+ "numApplications": 484,
+ "queueName": "default2",
+ "state": "RUNNING",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "hideReservationQueues": false,
+ "nodeLabels": [
+ "*"
+ ],
+ "numActiveApplications": 484,
+ "numPendingApplications": 0,
+ "numContainers": 0,
+ "maxApplications": 10000,
+ "maxApplicationsPerUser": 10000,
+ "userLimit": 100,
+ "users": {
+ "user": [
+ {
+ "username": "Default",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "numPendingApplications": 0,
+ "numActiveApplications": 468,
+ "AMResourceUsed": {
+ "memory": 30191616,
+ "vCores": 468
+ },
+ "userResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ }
+ }
+ ]
+ },
+ "userLimitFactor": 1.0,
+ "AMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "usedAMResource": {
+ "memory": 30388224,
+ "vCores": 532
+ },
+ "userAMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "preemptionDisabled": true
+ }
+ ]
+ },
+ "health": {
+ "lastrun": 1517951638085,
+ "operationsInfo": {
+ "entry": {
+ "key": "last-allocation",
+ "value": {
+ "nodeId": "node0:0",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-reservation",
+ "value": {
+ "nodeId": "node0:1",
+ "containerId": "container_e61477_1517879828320_0249_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-release",
+ "value": {
+ "nodeId": "node0:2",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-preemption",
+ "value": {
+ "nodeId": "N/A",
+ "containerId": "N/A",
+ "queue": "N/A"
+ }
+ }
+ },
+ "lastRunDetails": [
+ {
+ "operation": "releases",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "allocations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "reservations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ }
+ ]
+ }
+ }
+