diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8aa136dc522..cc272ee8b07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -38,6 +38,8 @@ import org.apache.hadoop.util.BasicDiskValidator; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.discovery.ConfigurationBasedSubclusterDiscovery; +import org.apache.hadoop.yarn.discovery.SubclusterDiscovery; @Public @Evolving @@ -3644,6 +3646,17 @@ public static boolean areNodeLabelsEnabled( public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD = "/usr/bin/numactl"; + /** Class to discover subclusters. */ + public static final String SUBCLUSTER_DISCOVERY_CLASS = + "yarn.subcluster.discovery.class"; + public static final Class + SUBCLUSTER_DISCOVERY_CLASS_DEFAULT = + ConfigurationBasedSubclusterDiscovery.class; + public static final String SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH = + "yarn.subcluster.discovery.zk.parent-path"; + public static final String DEFAULT_SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH = + "/subclusterdiscovery"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/ConfigurationBasedSubclusterDiscovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/ConfigurationBasedSubclusterDiscovery.java new file mode 100644 index 00000000000..ea91cb3ec02 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/ConfigurationBasedSubclusterDiscovery.java @@ -0,0 +1,52 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.discovery; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.collect.Maps; + + +/** + * Discover thesubcluster the NodeManager belongs to using the local + * configuration. + */ +public class ConfigurationBasedSubclusterDiscovery extends SubclusterDiscovery { + + @Override + public Map getRpcAddresses() throws IOException { + Configuration conf = getConf(); + Collection rmIds = + conf.getStringCollection(YarnConfiguration.RM_HA_IDS); + Map ret = Maps.newLinkedHashMap(); + for (String rmId : rmIds) { + InetSocketAddress addr = conf.getSocketAddr( + YarnConfiguration.RM_ADDRESS + "." + rmId, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + ret.put(rmId, addr); + } + return ret; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/SubclusterDiscovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/SubclusterDiscovery.java new file mode 100644 index 00000000000..9e3f3b77ee2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/SubclusterDiscovery.java @@ -0,0 +1,59 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.discovery; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Discover the subcluster the NodeManager belongs to. + */ +public abstract class SubclusterDiscovery implements Configurable { + + /** Local configuration. */ + private Configuration conf; + + + public static SubclusterDiscovery get(Configuration conf) { + Class clazz = conf.getClass( + YarnConfiguration.SUBCLUSTER_DISCOVERY_CLASS, + YarnConfiguration.SUBCLUSTER_DISCOVERY_CLASS_DEFAULT, + SubclusterDiscovery.class); + new ConfigurationBasedSubclusterDiscovery(); + return ReflectionUtils.newInstance(clazz, conf); + } + + @Override + public void setConf(Configuration config) { + this.conf = config; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + public abstract Map getRpcAddresses() + throws IOException; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/ZookeeperBasedSubclusterDiscovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/ZookeeperBasedSubclusterDiscovery.java new file mode 100644 index 00000000000..698562edeb0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/discovery/ZookeeperBasedSubclusterDiscovery.java @@ -0,0 +1,87 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.discovery; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + + +/** + * Discover the subcluster the NodeManager belongs to using the Zookeeper. + */ +public class ZookeeperBasedSubclusterDiscovery extends SubclusterDiscovery { + + private static final Logger LOG = + LoggerFactory.getLogger(ZookeeperBasedSubclusterDiscovery.class); + + + /** Interface to Zookeeper. */ + private ZKCuratorManager zkManager; + + /** Directory to store the state store data. */ + private String baseZNode; + + /** + * Initialize the Zookeeper connection. + */ + private void init() { + if (zkManager == null) { + Configuration conf = getConf(); + baseZNode = conf.get( + YarnConfiguration.SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_SUBCLUSTER_DISCOVERY_ZK_PARENT_PATH); + + try { + this.zkManager = new ZKCuratorManager(conf); + this.zkManager.start(); + } catch (IOException e) { + LOG.error("Cannot initialize the ZK connection", e); + } + } + } + + @Override + public Map getRpcAddresses() throws IOException { + init(); + Map ret = Maps.newLinkedHashMap(); + try { + List rmIds = zkManager.getChildren(baseZNode); + for (String rmId : rmIds) { + String address = zkManager.getStringData( + baseZNode + "/" + rmId + "/address"); + InetSocketAddress addr = NetUtils.createSocketAddr(address); + ret.put(rmId, addr); + } + } catch (Exception e) { + LOG.error("Cannot get the addresses", e); + new IOException(e.getMessage()); + } + return ret; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8154723f08f..b13948e6490 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.ConnectException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.discovery.SubclusterDiscovery; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -343,6 +345,17 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { @VisibleForTesting protected ResourceTracker getRMClient() throws IOException { Configuration conf = getConfig(); + + SubclusterDiscovery discovery = SubclusterDiscovery.get(conf); + Map addrs = discovery.getRpcAddresses(); + String rmIds = StringUtils.join(",", addrs.keySet()); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds); + for (Entry entry : addrs.entrySet()) { + String rmId = entry.getKey(); + InetSocketAddress addr = entry.getValue(); + conf.setSocketAddr(YarnConfiguration.RM_ADDRESS + "." + rmId, addr); + } + return ServerRMProxy.createRMProxy(conf, ResourceTracker.class); }