diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 531200e..b2a9c56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2523,6 +2523,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FEDERATION_MACHINE_LIST = FEDERATION_PREFIX + "machine-list"; + public static final String FEDERATION_CLUSTER_RESOLVER_CLASS = + NM_PREFIX + "federation-cluster-resolver.class"; + public static final String DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS = + "org.apache.hadoop.yarn.server.federation.resolver." + + "DefaultSubClusterResolverImpl"; + public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java index 6b4f60c..bccff2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java @@ -21,8 +21,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import java.util.HashMap; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.Map; /** @@ -31,9 +31,9 @@ */ public abstract class AbstractSubClusterResolver implements SubClusterResolver { private Map nodeToSubCluster = - new HashMap(); + new ConcurrentHashMap(); private Map> rackToSubClusters = - new HashMap>(); + new ConcurrentHashMap>(); @Override public SubClusterId getSubClusterForNode(String nodename) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 9b794de..0a10883 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -90,6 +91,7 @@ private int cacheTimeToLive; private Configuration conf; private Cache cache; + private SubClusterResolver subclusterResolver; private FederationStateStoreFacade() { initializeFacadeInternal(new Configuration()); @@ -104,6 +106,12 @@ private void initializeFacadeInternal(Configuration config) { FederationStateStore.class, createRetryPolicy(conf)); this.stateStore.init(conf); + this.subclusterResolver = createInstance(conf, + YarnConfiguration.FEDERATION_CLUSTER_RESOLVER_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS, + SubClusterResolver.class); + this.subclusterResolver.load(); + initCache(); } catch (YarnException ex) { @@ -348,6 +356,15 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) } /** + * Get the singleton instance of SubClusterResolver. + * + * @return SubClusterResolver instance + */ + public SubClusterResolver getSubClusterResolver() { + return this.subclusterResolver; + } + + /** * Helper method to create instances of Object using the class name defined in * the configuration object. The instances creates {@link RetryProxy} using * the specific {@link RetryPolicy}. @@ -359,23 +376,39 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) * @param retryPolicy the policy for retrying method call failures * @return a retry proxy for the specified interface */ - @SuppressWarnings("unchecked") public static Object createRetryInstance(Configuration conf, String configuredClassName, String defaultValue, Class type, RetryPolicy retryPolicy) { + return RetryProxy.create(type, + createInstance(conf, configuredClassName, defaultValue, type), + retryPolicy); + } + + /** + * Helper method to create instances of Object using the class name specified + * in the configuration object. + * + * @param conf the yarn configuration + * @param configuredClassName the configuration provider key + * @param defaultValue the default implementation class + * @param type the required interface/base class + * @return the instances created + */ + @SuppressWarnings("unchecked") + public static T createInstance(Configuration conf, + String configuredClassName, String defaultValue, Class type) { + String className = conf.get(configuredClassName, defaultValue); try { Class clusterResolverClass = conf.getClassByName(className); if (type.isAssignableFrom(clusterResolverClass)) { - return RetryProxy.create(type, - (T) ReflectionUtils.newInstance(clusterResolverClass, conf), - retryPolicy); + return (T) ReflectionUtils.newInstance(clusterResolverClass, conf); } else { - throw new YarnRuntimeException( - "Class: " + className + " not instance of " + type.getSimpleName()); + throw new YarnRuntimeException("Class: " + className + + " not instance of " + type.getCanonicalName()); } - } catch (Exception e) { + } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate : " + className, e); } }