diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn
index a98f3e6..d1f465f 100644
--- a/hadoop-yarn-project/hadoop-yarn/bin/yarn
+++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn
@@ -41,6 +41,7 @@ function hadoop_usage
echo " sharedcachemanager run the SharedCacheManager daemon"
echo " timelineserver run the timeline server"
echo " top view cluster information"
+ echo " waitrm set flag to let NM wait for RM to come back"
echo " version print the version"
echo ""
echo "Most commands print help when invoked w/o parameters."
@@ -183,6 +184,11 @@ case "${COMMAND}" in
hadoop_debug "Append YARN_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${YARN_CLIENT_OPTS}"
;;
+ waitrm)
+ CLASS=org.apache.hadoop.yarn.client.cli.ZKCLI
+ HADOOP_OPTS="$HADOOP_OPTS $YARN_CLIENT_OPTS"
+ set -- "${COMMAND}" "$@"
+ ;;
top)
doNotSetCols=0
doNotSetRows=0
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 253ae08..16b893d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -414,6 +414,10 @@ private static void addDeprecatedKeys() {
public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk-state-store.";
+ public static final String ZK_WAIT_RM_BASE_PATH =
+ RM_PREFIX + "wait-rm-base-path";
+ public static final String DEFAULT_ZK_WAIT_RM_BASE_PATH = "/wait-rm";
+
/** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH =
ZK_STATE_STORE_PREFIX + "parent-path";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index cb4df3a..3bb652b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -113,8 +113,11 @@
org.apache.hadoop
hadoop-yarn-server-resourcemanager
- test
+
+ org.apache.hadoop
+ hadoop-yarn-server-nodemanager
+
org.apache.hadoop
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index b1ab5f1..c1017e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -79,6 +79,9 @@
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
@@ -749,10 +752,15 @@ public void run() {
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
}
} catch (ConnectException e) {
- //catch and throw the exception if tried MAX wait time to connect RM
- dispatcher.getEventHandler().handle(
- new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
- throw new YarnRuntimeException(e);
+ if (needWaitRM()) {
+ LOG.info("The wait RM flag is set. Just wait RM to come back...");
+ } else {
+ LOG.info("The wait RM flag is not set. Just exit...");
+ //catch and throw the exception if tried MAX wait time to connect RM
+ dispatcher.getEventHandler().handle(
+ new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+ throw new YarnRuntimeException(e);
+ }
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
@@ -803,6 +811,33 @@ private void updateMasterKeys(NodeHeartbeatResponse response) {
context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
}
}
+
+ // whether NM need to wait RM to come back when it could not heartbeat with RM
+ private boolean needWaitRM() {
+ Configuration conf = new YarnConfiguration();
+ int retryTimes = 3;
+ for (int i = 0; i < retryTimes; i++) {
+ ZooKeeper zkClient = null;
+ String waitRMNode = null;
+ try {
+ zkClient = NodeStatusUpdaterImpl.getNewZooKeeper(conf);
+ waitRMNode = NodeStatusUpdaterImpl.getWaitRMNodePath(conf);
+ return zkClient.exists(waitRMNode, null) != null;
+ } catch (Throwable e) {
+ LOG.error("Failed to get info from znode " + waitRMNode
+ + ". Remain retry times : " + i + ". " + e);
+ } finally {
+ if (zkClient != null) {
+ try {
+ zkClient.close();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to close zkClient in needWaitRM. " + e);
+ }
+ }
+ }
+ }
+ return false;
+ }
};
statusUpdater =
new Thread(statusUpdaterRunnable, "Node Status Updater");
@@ -852,4 +887,31 @@ private void updateMasterKeys(NodeHeartbeatResponse response) {
}
return latestLogAggregationReports;
}
+
+ public static String getWaitRMNodePath(Configuration conf) {
+ String basePath = conf.get(YarnConfiguration.ZK_WAIT_RM_BASE_PATH,
+ YarnConfiguration.DEFAULT_ZK_WAIT_RM_BASE_PATH);
+ String clusterId = YarnConfiguration.getClusterId(conf);
+ return basePath + "/" + clusterId;
+ }
+
+ public static ZooKeeper getNewZooKeeper(Configuration conf) throws IOException {
+ String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+ if (zkHostPort == null) {
+ throw new YarnRuntimeException("No server address specified for " +
+ "zookeeper state store for Resource Manager recovery. " +
+ YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
+ }
+ int zkSessionTimeout =
+ conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
+ zk.register(new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ }
+ });
+
+ return zk;
+ }
}