diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 18e6082..15cf708 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2050,7 +2050,15 @@ private static void addDeprecatedKeys() { public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy"; public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY .name(); - + + /** Max time to wait for NM to connection to RM. */ + public static final String NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS = + YARN_PREFIX + "nodemanager.rm.connect.max-wait.ms"; + + /** Time interval between each NM attempt to connection to RM. */ + public static final String NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL = + YARN_PREFIX + "nodemanager.rm.connect.retrt.ms"; + /** * Node-labels configurations */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 23e1691..ac36368 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -83,12 +83,38 @@ protected InetSocketAddress getRMAddress( * well. */ @Private + protected static T createRMProxy(final long retryTime, + final long retryInterval, final Configuration configuration, + final Class protocol, RMProxy instance) throws IOException { + YarnConfiguration conf = (configuration instanceof YarnConfiguration) + ? (YarnConfiguration) configuration + : new YarnConfiguration(configuration); + RetryPolicy retryPolicy = createRetryPolicy(retryTime, retryInterval, conf); + if (HAUtil.isHAEnabled(conf)) { + RMFailoverProxyProvider provider = + instance.createRMFailoverProxyProvider(conf, protocol); + return (T) RetryProxy.create(protocol, provider, retryPolicy); + } else { + InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol); + LOG.info("Connecting to ResourceManager at " + rmAddress); + T proxy = RMProxy.getProxy(conf, protocol, rmAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } + } + + /** + * Create a proxy for the specified protocol. For non-HA, + * this is a direct connection to the ResourceManager address. When HA is + * enabled, the proxy handles the failover between the ResourceManagers as + * well. + */ + @Private protected static T createRMProxy(final Configuration configuration, final Class protocol, RMProxy instance) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - RetryPolicy retryPolicy = createRetryPolicy(conf); + RetryPolicy retryPolicy = createRetryPolicy(-1, -1, conf); if (HAUtil.isHAEnabled(conf)) { RMFailoverProxyProvider provider = instance.createRMFailoverProxyProvider(conf, protocol); @@ -119,7 +145,7 @@ protected InetSocketAddress getRMAddress( @Deprecated public static T createRMProxy(final Configuration conf, final Class protocol, InetSocketAddress rmAddress) throws IOException { - RetryPolicy retryPolicy = createRetryPolicy(conf); + RetryPolicy retryPolicy = createRetryPolicy(-1, -1, conf); T proxy = RMProxy.getProxy(conf, protocol, rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); @@ -169,12 +195,13 @@ public T run() { */ @Private @VisibleForTesting - public static RetryPolicy createRetryPolicy(Configuration conf) { - long rmConnectWaitMS = + public static RetryPolicy createRetryPolicy(long retryTime, + long retryInterval, Configuration conf) { + long rmConnectWaitMS = (retryTime != -1) ? retryTime: conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); - long rmConnectionRetryIntervalMS = + long rmConnectionRetryIntervalMS = (retryInterval != -1) ? retryInterval: conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, YarnConfiguration diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bf94195..0c525d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1550,6 +1550,18 @@ + Max time to wait for NM to connection to RM. + yarn.nodemanager.rm.connect.max-wait.ms + + + + + Time interval between each NM attempt to connection to RM + yarn.nodemanager.rm.connect.max-wait.ms + + + + Maximum number of proxy connections to cache for node managers. If set to a value greater than zero then the cache is enabled and the NMClient diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 5d4fc46..93f2efe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -40,6 +40,23 @@ private ServerRMProxy() { /** * Create a proxy to the ResourceManager for the specified protocol. + * @param retryTime retry time used by rmProxy. + * @param retryInterval retry interval used by rmProxy. + * @param configuration Configuration with all the required information. + * @param protocol Server protocol for which proxy is being requested. + * @param Type of proxy. + * @return Proxy to the ResourceManager for the specified server protocol. + * @throws IOException + */ + public static T createRMProxy(final long retryTime, + final long retryInterval, final Configuration configuration, + final Class protocol) throws IOException { + return createRMProxy(retryTime, retryInterval, + configuration, protocol, INSTANCE); + } + + /** + * Create a proxy to the ResourceManager for the specified protocol. * @param configuration Configuration with all the required information. * @param protocol Server protocol for which proxy is being requested. * @param Type of proxy. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3f8cf32..aeabc19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -307,7 +307,14 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { @VisibleForTesting protected ResourceTracker getRMClient() throws IOException { Configuration conf = getConfig(); - return ServerRMProxy.createRMProxy(conf, ResourceTracker.class); + long nmRmConnectWait = + conf.getLong( + YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, -1); + long nmRmRetryInterval = + conf.getLong( + YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL, -1); + return ServerRMProxy.createRMProxy(nmRmConnectWait, + nmRmRetryInterval, conf, ResourceTracker.class); } @VisibleForTesting diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index e231f1b..3a6c540 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -442,7 +443,7 @@ protected void serviceStart() throws Exception { @Override protected ResourceTracker getRMClient() throws IOException { - RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(-1, -1, conf); resourceTracker = (ResourceTracker) RetryProxy.create(ResourceTracker.class, new MyResourceTracker6(rmStartIntervalMS, rmNeverStart), @@ -475,7 +476,7 @@ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, @Override protected ResourceTracker getRMClient() { - RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(-1, -1, conf); return (ResourceTracker) RetryProxy.create(ResourceTracker.class, resourceTracker, retryPolicy); } @@ -486,6 +487,35 @@ protected void stopRMProxy() { } } + private class MyNodeStatusUpdater6 extends NodeStatusUpdaterImpl { + + private final long rmStartIntervalMS; + private final boolean rmNeverStart; + public ResourceTracker resourceTracker; + public MyNodeStatusUpdater6(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + long rmStartIntervalMS, boolean rmNeverStart) { + super(context, dispatcher, healthChecker, metrics); + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + } + + @Override + protected void serviceStart() throws Exception { + //record the startup time + super.serviceStart(); + } + + private boolean isTriggered() { + return triggered; + } + + @Override + protected void stopRMProxy() { + return; + } + } + private class MyNodeManager extends NodeManager { private MyNodeStatusUpdater3 nodeStatusUpdater; @@ -1310,6 +1340,50 @@ protected NodeStatusUpdater createUpdater(Context context, } @Test (timeout = 150000) + public void testNMRMConnectionConf() throws Exception { + final long connectionWaitMs = 1000; + final long nmConnectionWaitMs = 10000; + final long connectionRetryIntervalMs = 1000; + //Waiting for rmStartIntervalMS, RM will be started + final long rmStartIntervalMS = 2*1000; + conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + nmConnectionWaitMs); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + connectionWaitMs); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + connectionRetryIntervalMs); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + 1); + //Test NM try to connect to RM Several times, but finally fail + NodeManagerWithCustomNodeStatusUpdater nmWithUpdater; + nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { + @Override + protected NodeStatusUpdater createUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater6( + context, dispatcher, healthChecker, metrics, + rmStartIntervalMS, true); + return nodeStatusUpdater; + } + }; + nm.init(conf); + long waitStartTime = System.currentTimeMillis(); + try { + nm.start(); + Assert.fail("NM should have failed to start due to RM connect failure"); + } catch(Exception e) { + long t = System.currentTimeMillis(); + long duration = t - waitStartTime; + boolean waitTimeValid = (duration >= nmConnectionWaitMs); + + if(!waitTimeValid) { + // throw exception if NM doesn't retry long enough + throw new Exception("NM didn't try to connect to RM long enough"); + } + } + } + + @Test (timeout = 150000) public void testNMConnectionToRM() throws Exception { final long delta = 50000; final long connectionWaitMs = 5000;