diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 18e6082..c0399f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2050,7 +2050,15 @@ private static void addDeprecatedKeys() { public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy"; public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY .name(); - + + /** Max time to wait for NM to connection to RM. */ + public static final String NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS = + YARN_PREFIX + "nodemanager.resourcemanager.connect.max-wait.ms"; + + /** Time interval between each NM attempt to connection to RM. */ + public static final String NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL = + YARN_PREFIX + "nodemanager.resourcemanager.connect.retry-interval.ms"; + /** * Node-labels configurations */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index b29263e..1d448a2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -69,7 +69,17 @@ private ClientRMProxy(){ */ public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { - return createRMProxy(configuration, protocol, INSTANCE); + long clientRmConnectWait = + configuration.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long clientRmRetryInterval = + configuration.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + return createRMProxy(configuration, protocol, INSTANCE, + clientRmConnectWait, clientRmRetryInterval); } private static void setAMRMTokenService(final Configuration conf) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 23e1691..48970e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -84,11 +84,13 @@ protected InetSocketAddress getRMAddress( */ @Private protected static T createRMProxy(final Configuration configuration, - final Class protocol, RMProxy instance) throws IOException { + final Class protocol, RMProxy instance, final long retryTime, + final long retryInterval) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - RetryPolicy retryPolicy = createRetryPolicy(conf); + RetryPolicy retryPolicy = + createRetryPolicy(retryTime, retryInterval, conf); if (HAUtil.isHAEnabled(conf)) { RMFailoverProxyProvider provider = instance.createRMFailoverProxyProvider(conf, protocol); @@ -119,7 +121,17 @@ protected InetSocketAddress getRMAddress( @Deprecated public static T createRMProxy(final Configuration conf, final Class protocol, InetSocketAddress rmAddress) throws IOException { - RetryPolicy retryPolicy = createRetryPolicy(conf); + long rmConnectWait = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long rmRetryInterval = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + RetryPolicy retryPolicy = createRetryPolicy( + rmConnectWait, rmRetryInterval, conf); T proxy = RMProxy.getProxy(conf, protocol, rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); @@ -169,16 +181,10 @@ public T run() { */ @Private @VisibleForTesting - public static RetryPolicy createRetryPolicy(Configuration conf) { - long rmConnectWaitMS = - conf.getLong( - YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, - YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); - long rmConnectionRetryIntervalMS = - conf.getLong( - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, - YarnConfiguration - .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + public static RetryPolicy createRetryPolicy(long retryTime, + long retryInterval, Configuration conf) { + long rmConnectWaitMS = retryTime; + long rmConnectionRetryIntervalMS = retryInterval; boolean waitForEver = (rmConnectWaitMS == -1); if (!waitForEver) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 997eb8e..1353323 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1550,6 +1550,18 @@ + Max time to wait for NM to connection to RM. + yarn.nodemanager.resourcemanager.connect.max-wait.ms + + + + + Time interval between each NM attempt to connection to RM + yarn.nodemanager.resourcemanager.connect.retry-interval.ms + + + + Maximum number of proxy connections to cache for node managers. If set to a value greater than zero then the cache is enabled and the NMClient diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 5d4fc46..2d4085f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -48,8 +48,26 @@ private ServerRMProxy() { */ public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { - return createRMProxy(configuration, protocol, INSTANCE); -} + long rmConnectWait = + configuration.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long rmRetryInterval = + configuration.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + long nmRmConnectWait = + configuration.getLong( + YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + rmConnectWait); + long nmRmRetryInterval = + configuration.getLong( + YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + rmRetryInterval); + return createRMProxy(configuration, protocol, INSTANCE, + nmRmConnectWait, nmRmRetryInterval); + } @InterfaceAudience.Private @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index e231f1b..90f17e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -442,7 +443,17 @@ protected void serviceStart() throws Exception { @Override protected ResourceTracker getRMClient() throws IOException { - RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + long rmConnectWait = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long rmRetryInterval = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(rmConnectWait, + rmRetryInterval, conf); resourceTracker = (ResourceTracker) RetryProxy.create(ResourceTracker.class, new MyResourceTracker6(rmStartIntervalMS, rmNeverStart), @@ -475,7 +486,17 @@ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, @Override protected ResourceTracker getRMClient() { - RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + long rmConnectWait = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long rmRetryInterval = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + RetryPolicy retryPolicy = RMProxy.createRetryPolicy( + rmConnectWait, rmRetryInterval, conf); return (ResourceTracker) RetryProxy.create(ResourceTracker.class, resourceTracker, retryPolicy); } @@ -486,6 +507,35 @@ protected void stopRMProxy() { } } + private class MyNodeStatusUpdater6 extends NodeStatusUpdaterImpl { + + private final long rmStartIntervalMS; + private final boolean rmNeverStart; + public ResourceTracker resourceTracker; + public MyNodeStatusUpdater6(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + long rmStartIntervalMS, boolean rmNeverStart) { + super(context, dispatcher, healthChecker, metrics); + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + } + + @Override + protected void serviceStart() throws Exception { + //record the startup time + super.serviceStart(); + } + + private boolean isTriggered() { + return triggered; + } + + @Override + protected void stopRMProxy() { + return; + } + } + private class MyNodeManager extends NodeManager { private MyNodeStatusUpdater3 nodeStatusUpdater; @@ -1309,6 +1359,59 @@ protected NodeStatusUpdater createUpdater(Context context, + "Message from ResourceManager: RM Shutting Down Node"); } + @Test (timeout = 100000) + public void testNMRMConnectionConf() throws Exception { + final long delta = 50000; + final long nmRmConnectionWaitMs = 100; + final long nmRmRetryInterval = 100; + final long connectionWaitMs = -1; + final long connectionRetryIntervalMs = 1000; + //Waiting for rmStartIntervalMS, RM will be started + final long rmStartIntervalMS = 2*1000; + conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + nmRmConnectionWaitMs); + conf.setLong( + YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + nmRmRetryInterval); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + connectionWaitMs); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + connectionRetryIntervalMs); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + 1); + //Test NM try to connect to RM Several times, but finally fail + NodeManagerWithCustomNodeStatusUpdater nmWithUpdater; + nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { + @Override + protected NodeStatusUpdater createUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater6( + context, dispatcher, healthChecker, metrics, + rmStartIntervalMS, true); + return nodeStatusUpdater; + } + }; + nm.init(conf); + long waitStartTime = System.currentTimeMillis(); + try { + nm.start(); + Assert.fail("NM should have failed to start due to RM connect failure"); + } catch(Exception e) { + long t = System.currentTimeMillis(); + long duration = t - waitStartTime; + boolean waitTimeValid = (duration >= nmRmConnectionWaitMs) && + (duration < (connectionWaitMs + delta)); + + if(!waitTimeValid) { + // throw exception if NM doesn't retry long enough + throw new Exception("NM should have tried re-connecting to RM during " + + "period of at least " + connectionWaitMs + " ms, but " + + "stopped retrying within " + (connectionWaitMs + delta) + + " ms: " + e, e); + } + } + } + @Test (timeout = 150000) public void testNMConnectionToRM() throws Exception { final long delta = 50000;