diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9ec25ae..31851bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1950,7 +1950,11 @@ private static void addDeprecatedKeys() { public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy"; public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY .name(); - + + /** Max time to wait for NM to connection to RM. */ + public static final String NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS = + YARN_PREFIX + "nodemanager.rm.connect.max-wait.ms"; + /** * Node-labels configurations */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index b76defb..7e519cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1499,6 +1499,12 @@ + Time interval between each attempt to connect to NM + yarn.nodemanager.rm.connect.max-wait.ms + 10000 + + + Maximum number of proxy connections to cache for node managers. If set to a value greater than zero then the cache is enabled and the NMClient diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 05efc69..5dfdceec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -304,6 +304,16 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { @VisibleForTesting protected ResourceTracker getRMClient() throws IOException { Configuration conf = getConfig(); + long rmConnectWaitMS = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long nmRmConnectWait = + conf.getLong( + YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + rmConnectWaitMS); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + nmRmConnectWait); return ServerRMProxy.createRMProxy(conf, ResourceTracker.class); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 3c0368b..22efcd7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -442,6 +443,35 @@ protected void stopRMProxy() { } } + private class MyNodeStatusUpdater6 extends NodeStatusUpdaterImpl { + + private final long rmStartIntervalMS; + private final boolean rmNeverStart; + public ResourceTracker resourceTracker; + public MyNodeStatusUpdater6(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + long rmStartIntervalMS, boolean rmNeverStart) { + super(context, dispatcher, healthChecker, metrics); + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + } + + @Override + protected void serviceStart() throws Exception { + //record the startup time + super.serviceStart(); + } + + private boolean isTriggered() { + return triggered; + } + + @Override + protected void stopRMProxy() { + return; + } + } + private class MyNodeManager extends NodeManager { private MyNodeStatusUpdater3 nodeStatusUpdater; @@ -1266,6 +1296,50 @@ protected NodeStatusUpdater createUpdater(Context context, } @Test (timeout = 150000) + public void testNMRMConnectionConf() throws Exception { + final long connectionWaitMs = 1000; + final long nmConnectionWaitMs = 10000; + final long connectionRetryIntervalMs = 1000; + //Waiting for rmStartIntervalMS, RM will be started + final long rmStartIntervalMS = 2*1000; + conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + nmConnectionWaitMs); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + connectionWaitMs); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + connectionRetryIntervalMs); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + 1); + //Test NM try to connect to RM Several times, but finally fail + NodeManagerWithCustomNodeStatusUpdater nmWithUpdater; + nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { + @Override + protected NodeStatusUpdater createUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater6( + context, dispatcher, healthChecker, metrics, + rmStartIntervalMS, true); + return nodeStatusUpdater; + } + }; + nm.init(conf); + long waitStartTime = System.currentTimeMillis(); + try { + nm.start(); + Assert.fail("NM should have failed to start due to RM connect failure"); + } catch(Exception e) { + long t = System.currentTimeMillis(); + long duration = t - waitStartTime; + boolean waitTimeValid = (duration >= nmConnectionWaitMs); + + if(!waitTimeValid) { + // throw exception if NM doesn't retry long enough + throw new Exception("NM didn't try to connect to RM long enough"); + } + } + } + + @Test (timeout = 150000) public void testNMConnectionToRM() throws Exception { final long delta = 50000; final long connectionWaitMs = 5000;