diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6b660f7..ff171e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -871,6 +871,12 @@ private static void addDeprecatedKeys() { public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = 100; + /** Specifies whether node manager should go down on connection failures. */ + public static final String NM_SHUTDOWN_ON_RM_CONNECTION_FAILURES = + NM_PREFIX + "shutdown.on.RM.connection.failures"; + public static final boolean DEFAULT_NM_SHUTDOWN_ON_RM_CONNECTION_FAILURES = + true; + /** Enable or disable node hardware capability detection. */ public static final String NM_ENABLE_HARDWARE_CAPABILITY_DETECTION = NM_PREFIX + "resource.detect-hardware-capabilities"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 621198c..74a11c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1206,6 +1206,12 @@ + Specifies whether node manager should go down on connection failures. Nodemanger shutsdown itself if + it is not able to connect to RM. + yarn.nodemanager.shutdown.on.RM.connection.failures + true + + Max number of threads in NMClientAsync to process container management events yarn.client.nodemanager-client-async.thread-pool-max-size diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3721b0e..e242cf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; -import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -138,6 +137,7 @@ private final NodeLabelsProvider nodeLabelsProvider; private final boolean hasNodeLabelsProvider; + private boolean nmShutdownOnRMConnectionFailures; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -174,6 +174,10 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB."); LOG.info("Nodemanager resources: vcores set to " + virtualCores + "."); + nmShutdownOnRMConnectionFailures = + conf.getBoolean(YarnConfiguration.NM_SHUTDOWN_ON_RM_CONNECTION_FAILURES, + YarnConfiguration.DEFAULT_NM_SHUTDOWN_ON_RM_CONNECTION_FAILURES); + this.totalResource = Resource.newInstance(memoryMb, virtualCores); metrics.addResource(totalResource); this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf); @@ -784,9 +788,15 @@ public void run() { } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - throw new YarnRuntimeException(e); + if (nmShutdownOnRMConnectionFailures) { + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + throw new YarnRuntimeException(e); + } else { + LOG.error("Not shutting down NodeManager. Retry after default heartbeat interval time"); + // keeping negative value to retry after default heartbeat interval + nextHeartBeatInterval = -1; + } } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index bc48adf..baa7239 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -25,6 +25,7 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -442,6 +443,28 @@ protected void stopRMProxy() { } } + private class MyNodeStatusUpdater6 extends NodeStatusUpdaterImpl { + public ResourceTracker resourceTracker; + private Context context; + + public MyNodeStatusUpdater6(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + this.context = context; + this.resourceTracker = new MyResourceTracker8(this.context); + } + + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected void stopRMProxy() { + return; + } + } + private class MyNodeManager extends NodeManager { private MyNodeStatusUpdater3 nodeStatusUpdater; @@ -491,6 +514,28 @@ protected void serviceStop() throws Exception { syncBarrier.await(10000, TimeUnit.MILLISECONDS); } } + + private class MyNodeManager3 extends NodeManager { + private MyNodeStatusUpdater6 nodeStatusUpdater; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + this.nodeStatusUpdater = + new MyNodeStatusUpdater6(context, dispatcher, healthChecker, metrics); + return this.nodeStatusUpdater; + } + + public MyNodeStatusUpdater6 getNodeStatusUpdater() { + return this.nodeStatusUpdater; + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + } + // private class MyResourceTracker2 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; @@ -866,6 +911,42 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager( } } + private class MyResourceTracker8 implements ResourceTracker { + public NodeAction registerNodeAction = NodeAction.NORMAL; + private final Context context; + + MyResourceTracker8(Context context) { + this.context = context; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, + IOException { + + RegisterNodeManagerResponse response = + recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(registerNodeAction); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + return response; + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnException, IOException, ConnectException { + // Throwing connection exception to verify the behavior of NM when it is failed to send heartbeat to RM + throw new ConnectException(); + } + + @Override + public UnRegisterNodeManagerResponse unRegisterNodeManager( + UnRegisterNodeManagerRequest request) throws YarnException, IOException { + return recordFactory + .newRecordInstance(UnRegisterNodeManagerResponse.class); + } + } + @Before public void clearError() { nmStartError = null; @@ -1706,4 +1787,30 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; } + + @Test + public void testNMShutdownOnConnectionFailures() throws Exception { + MyNodeManager3 nm = new MyNodeManager3(); + try { + YarnConfiguration conf = createNMConfig(); + conf.setBoolean(YarnConfiguration.NM_SHUTDOWN_ON_RM_CONNECTION_FAILURES, true); + nm.init(conf); + nm.start(); + Thread.sleep(5000); + Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); + } finally { + nm.stop(); + } + + nm = new MyNodeManager3(); + try { + conf.setBoolean(YarnConfiguration.NM_SHUTDOWN_ON_RM_CONNECTION_FAILURES, false); + nm.init(conf); + nm.start(); + Thread.sleep(5000); + Assert.assertEquals(STATE.STARTED, nm.getServiceState()); + } finally { + nm.stop(); + } + } }