diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8aa136dc522..cc272ee8b07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -38,6 +38,8 @@ import org.apache.hadoop.util.BasicDiskValidator; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.discovery.ConfigurationBasedSubclusterDiscovery; +import org.apache.hadoop.yarn.discovery.SubclusterDiscovery; @Public @Evolving @@ -3644,6 +3646,17 @@ public static boolean areNodeLabelsEnabled( public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD = "/usr/bin/numactl"; + /** Class to discover subclusters. */ + public static final String SUBCLUSTER_DISCOVERY_CLASS = + "yarn.subcluster.discovery.class"; + public static final Class + SUBCLUSTER_DISCOVERY_CLASS_DEFAULT = + ConfigurationBasedSubclusterDiscovery.class; + public static final String SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH = + "yarn.subcluster.discovery.zk.parent-path"; + public static final String DEFAULT_SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH = + "/subclusterdiscovery"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index f7cb47a9dc8..97b09b6284f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -46,6 +46,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.discovery.SubclusterDiscovery; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -168,6 +169,9 @@ public T run() { YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e); } + SubclusterDiscovery discovery = SubclusterDiscovery.get(conf); + discovery.populateRMConf(conf); + RMFailoverProxyProvider provider = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, defaultProviderClass, RMFailoverProxyProvider.class), conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/ConfigurationBasedSubclusterDiscovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/ConfigurationBasedSubclusterDiscovery.java new file mode 100644 index 00000000000..cbfeb7314f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/ConfigurationBasedSubclusterDiscovery.java @@ -0,0 +1,81 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.discovery; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_ADDRESS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_PORT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_ADDRESS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_ADMIN_ADDRESS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_HA_IDS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_SCHEDULER_ADDRESS; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + + +/** + * Discover the subcluster the NodeManager belongs to using the local + * configuration. + */ +public class ConfigurationBasedSubclusterDiscovery extends SubclusterDiscovery { + + @Override + public Map getRpcAddresses() throws IOException { + return getAddresses(RM_ADDRESS); + } + + @Override + public Map getAdminAddresses() throws IOException { + return getAddresses(RM_ADMIN_ADDRESS); + } + + @Override + public Map getSchedulerAddresses() + throws IOException { + return getAddresses(RM_SCHEDULER_ADDRESS); + } + + @Override + public Map getResourceTrackerAddresses() + throws IOException { + return getAddresses(RM_RESOURCE_TRACKER_ADDRESS); + } + + private Map getAddresses(final String confKey) + throws IOException { + Configuration conf = getConf(); + Collection rmIds = + conf.getStringCollection(RM_HA_IDS); + Map ret = Maps.newLinkedHashMap(); + for (String rmId : rmIds) { + InetSocketAddress addr = conf.getSocketAddr( + confKey + "." + rmId, + DEFAULT_RM_ADDRESS, + DEFAULT_RM_PORT); + ret.put(rmId, addr); + } + return ret; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/SubclusterDiscovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/SubclusterDiscovery.java new file mode 100644 index 00000000000..c8c34d04456 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/SubclusterDiscovery.java @@ -0,0 +1,113 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.discovery; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_ADDRESS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_ADMIN_ADDRESS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_HA_IDS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_SCHEDULER_ADDRESS; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Discover the subcluster the NodeManager belongs to. + */ +public abstract class SubclusterDiscovery implements Configurable { + + private static final Logger LOG = + LoggerFactory.getLogger(SubclusterDiscovery.class); + + /** Local configuration. */ + private Configuration conf; + + + public static SubclusterDiscovery get(Configuration conf) { + Class clazz = conf.getClass( + YarnConfiguration.SUBCLUSTER_DISCOVERY_CLASS, + YarnConfiguration.SUBCLUSTER_DISCOVERY_CLASS_DEFAULT, + SubclusterDiscovery.class); + new ConfigurationBasedSubclusterDiscovery(); + return ReflectionUtils.newInstance(clazz, conf); + } + + @Override + public void setConf(Configuration config) { + this.conf = config; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + public abstract Map getRpcAddresses() + throws IOException; + + public abstract Map getAdminAddresses() + throws IOException; + + public abstract Map getSchedulerAddresses() + throws IOException; + + public abstract Map getResourceTrackerAddresses() + throws IOException; + + /** + * Populate the RM endpoints. + * @param conf Input configuration. + */ + public void populateRMConf(Configuration inConf) { + try { + Map addrs = getRpcAddresses(); + Map adminAddrs = getAdminAddresses(); + Map schedAddrs = getSchedulerAddresses(); + Map rtAddrs = getResourceTrackerAddresses(); + + String rmIds = StringUtils.join(",", addrs.keySet()); + inConf.set(RM_HA_IDS, rmIds); + + populateRMConf(inConf, RM_ADDRESS, addrs); + populateRMConf(inConf, RM_ADMIN_ADDRESS, adminAddrs); + populateRMConf(inConf, RM_SCHEDULER_ADDRESS, schedAddrs); + populateRMConf(inConf, RM_RESOURCE_TRACKER_ADDRESS, rtAddrs); + } catch (IOException ioe) { + LOG.error("Cannot populate the configuration with the RM addresses", ioe); + } + } + + private static void populateRMConf(Configuration inConf, final String confKey, + final Map addrs) { + for (Entry entry : addrs.entrySet()) { + String rmId = entry.getKey(); + InetSocketAddress addr = entry.getValue(); + inConf.setSocketAddr(confKey + "." + rmId, addr); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/ZookeeperBasedSubclusterDiscovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/ZookeeperBasedSubclusterDiscovery.java new file mode 100644 index 00000000000..6e3d8752ea6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/discovery/ZookeeperBasedSubclusterDiscovery.java @@ -0,0 +1,109 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.discovery; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + + +/** + * Discover the subcluster the NodeManager belongs to using the Zookeeper. + */ +public class ZookeeperBasedSubclusterDiscovery extends SubclusterDiscovery { + + private static final Logger LOG = + LoggerFactory.getLogger(ZookeeperBasedSubclusterDiscovery.class); + + + /** Interface to Zookeeper. */ + private ZKCuratorManager zkManager; + + /** Directory to store the state store data. */ + private String baseZNode; + + /** + * Initialize the Zookeeper connection. + */ + private void init() { + if (zkManager == null) { + Configuration conf = getConf(); + baseZNode = conf.get( + YarnConfiguration.SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH); + + try { + this.zkManager = new ZKCuratorManager(conf); + this.zkManager.start(); + } catch (IOException e) { + LOG.error("Cannot initialize the ZK connection", e); + } + } + } + + @Override + public Map getRpcAddresses() throws IOException { + return getAddresses("address"); + } + + @Override + public Map getAdminAddresses() throws IOException { + return getAddresses("admin"); + } + + @Override + public Map getSchedulerAddresses() + throws IOException { + return getAddresses("scheduler"); + } + + @Override + public Map getResourceTrackerAddresses() + throws IOException { + return getAddresses("resourceTracker"); + } + + private Map getAddresses(final String attr) + throws IOException { + init(); + Map ret = Maps.newLinkedHashMap(); + try { + List rmIds = zkManager.getChildren(baseZNode); + for (String rmId : rmIds) { + String address = zkManager.getStringData( + baseZNode + "/" + rmId + "/" + attr); + InetSocketAddress addr = NetUtils.createSocketAddr(address); + ret.put(rmId, addr); + } + } catch (Exception e) { + LOG.error("Cannot get the addresses", e); + new IOException(e.getMessage()); + } + return ret; + } +} \ No newline at end of file