diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn index a98f3e6..d1f465f 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -41,6 +41,7 @@ function hadoop_usage echo " sharedcachemanager run the SharedCacheManager daemon" echo " timelineserver run the timeline server" echo " top view cluster information" + echo " waitrm set flag to let NM wait for RM to come back" echo " version print the version" echo "" echo "Most commands print help when invoked w/o parameters." @@ -183,6 +184,11 @@ case "${COMMAND}" in hadoop_debug "Append YARN_CLIENT_OPTS onto HADOOP_OPTS" HADOOP_OPTS="${HADOOP_OPTS} ${YARN_CLIENT_OPTS}" ;; + waitrm) + CLASS=org.apache.hadoop.yarn.client.cli.ZKCLI + HADOOP_OPTS="$HADOOP_OPTS $YARN_CLIENT_OPTS" + set -- "${COMMAND}" "$@" + ;; top) doNotSetCols=0 doNotSetRows=0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 253ae08..16b893d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -414,6 +414,10 @@ private static void addDeprecatedKeys() { public static final String ZK_STATE_STORE_PREFIX = RM_PREFIX + "zk-state-store."; + public static final String ZK_WAIT_RM_BASE_PATH = + RM_PREFIX + "wait-rm-base-path"; + public static final String DEFAULT_ZK_WAIT_RM_BASE_PATH = "/wait-rm"; + /** Parent znode path under which ZKRMStateStore will create znodes */ public static final String ZK_RM_STATE_STORE_PARENT_PATH = ZK_STATE_STORE_PREFIX + "parent-path"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index cb4df3a..3bb652b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -113,8 +113,11 @@ org.apache.hadoop hadoop-yarn-server-resourcemanager - test + + org.apache.hadoop + hadoop-yarn-server-nodemanager + org.apache.hadoop diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b1ab5f1..c1017e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -79,6 +79,9 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -749,10 +752,15 @@ public void run() { .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } } catch (ConnectException e) { - //catch and throw the exception if tried MAX wait time to connect RM - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - throw new YarnRuntimeException(e); + if (needWaitRM()) { + LOG.info("The wait RM flag is set. Just wait RM to come back..."); + } else { + LOG.info("The wait RM flag is not set. Just exit..."); + //catch and throw the exception if tried MAX wait time to connect RM + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + throw new YarnRuntimeException(e); + } } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the @@ -803,6 +811,33 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { context.getNMTokenSecretManager().setMasterKey(updatedMasterKey); } } + + // whether NM need to wait RM to come back when it could not heartbeat with RM + private boolean needWaitRM() { + Configuration conf = new YarnConfiguration(); + int retryTimes = 3; + for (int i = 0; i < retryTimes; i++) { + ZooKeeper zkClient = null; + String waitRMNode = null; + try { + zkClient = NodeStatusUpdaterImpl.getNewZooKeeper(conf); + waitRMNode = NodeStatusUpdaterImpl.getWaitRMNodePath(conf); + return zkClient.exists(waitRMNode, null) != null; + } catch (Throwable e) { + LOG.error("Failed to get info from znode " + waitRMNode + + ". Remain retry times : " + i + ". " + e); + } finally { + if (zkClient != null) { + try { + zkClient.close(); + } catch (InterruptedException e) { + LOG.error("Failed to close zkClient in needWaitRM. " + e); + } + } + } + } + return false; + } }; statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); @@ -852,4 +887,31 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { } return latestLogAggregationReports; } + + public static String getWaitRMNodePath(Configuration conf) { + String basePath = conf.get(YarnConfiguration.ZK_WAIT_RM_BASE_PATH, + YarnConfiguration.DEFAULT_ZK_WAIT_RM_BASE_PATH); + String clusterId = YarnConfiguration.getClusterId(conf); + return basePath + "/" + clusterId; + } + + public static ZooKeeper getNewZooKeeper(Configuration conf) throws IOException { + String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); + if (zkHostPort == null) { + throw new YarnRuntimeException("No server address specified for " + + "zookeeper state store for Resource Manager recovery. " + + YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); + } + int zkSessionTimeout = + conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); + zk.register(new Watcher() { + @Override + public void process(WatchedEvent event) { + } + }); + + return zk; + } }