diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 52fff14..51e0c43 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -833,6 +833,12 @@ private static void addDeprecatedKeys() {
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;
+ /** Specifies whether node manager should go down on connection failures. */
+ public static final String NM_SHUTSDWON_ON_RM_CONNECTION_FAILURES =
+ NM_PREFIX + "shutdown.on.connection.failures";
+ public static final boolean DEFAULT_NM_SHUTSDOWN_ON_RM_CONNECTION_FAILURES =
+ true;
+
/**
* Prefix for disk configurations. Work in progress: This configuration
* parameter may be changed/removed in the future.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1dd88bd..659e333 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1145,6 +1145,12 @@
+ Specifies whether node manager should go down on connection failures. Nodemanger shutsdown itself if
+ it is not able to connect to RM.
+ yarn.nodemanager.shutdown.on.connection.failures
+ true
+
+
Max number of threads in NMClientAsync to process container
management events
yarn.client.nodemanager-client-async.thread-pool-max-size
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 8046228..850b119 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -131,6 +131,7 @@
private final NodeLabelsProvider nodeLabelsProvider;
private final boolean hasNodeLabelsProvider;
+ private boolean shutsDownNMonConnectionFailures;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -169,6 +170,10 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+ shutsDownNMonConnectionFailures =
+ conf.getBoolean(YarnConfiguration.NM_SHUTSDWON_ON_RM_CONNECTION_FAILURES,
+ YarnConfiguration.DEFAULT_NM_SHUTSDOWN_ON_RM_CONNECTION_FAILURES);
+
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
@@ -750,9 +755,15 @@ public void run() {
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
- dispatcher.getEventHandler().handle(
- new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
- throw new YarnRuntimeException(e);
+ if (shutsDownNMonConnectionFailures) {
+ dispatcher.getEventHandler().handle(
+ new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+ throw new YarnRuntimeException(e);
+ } else {
+ LOG.error("Not shutting down NodeManager. Retry after default heartbeat interval time");
+ // keeping negative value to retry after default heartbeat interval
+ nextHeartBeatInterval = -1;
+ }
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index fc404de..62f7dcf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -25,6 +25,7 @@
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -433,6 +434,28 @@ protected void stopRMProxy() {
}
}
+ private class MyNodeStatusUpdater6 extends NodeStatusUpdaterImpl {
+ public ResourceTracker resourceTracker;
+ private Context context;
+
+ public MyNodeStatusUpdater6(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ this.context = context;
+ this.resourceTracker = new MyResourceTracker8(this.context);
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
+ }
+
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -482,6 +505,28 @@ protected void serviceStop() throws Exception {
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
}
}
+
+ private class MyNodeManager3 extends NodeManager {
+ private MyNodeStatusUpdater6 nodeStatusUpdater;
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ this.nodeStatusUpdater =
+ new MyNodeStatusUpdater6(context, dispatcher, healthChecker, metrics);
+ return this.nodeStatusUpdater;
+ }
+
+ public MyNodeStatusUpdater6 getNodeStatusUpdater() {
+ return this.nodeStatusUpdater;
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+ }
+
//
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@@ -822,6 +867,35 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
}
+ private class MyResourceTracker8 implements ResourceTracker {
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ private final Context context;
+
+ MyResourceTracker8(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException,
+ IOException {
+
+ RegisterNodeManagerResponse response =
+ recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(registerNodeAction);
+ response.setContainerTokenMasterKey(createMasterKey());
+ response.setNMTokenMasterKey(createMasterKey());
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnException, IOException, ConnectException {
+ // Throwing connection exception to verify the behavior of NM when it is failed to send heartbeat to RM
+ throw new ConnectException();
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -1662,4 +1736,30 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
}
};
}
+
+ @Test
+ public void testNMShutdownOnConnectionFailures() throws Exception {
+ MyNodeManager3 nm = new MyNodeManager3();
+ try {
+ YarnConfiguration conf = createNMConfig();
+ conf.setBoolean(YarnConfiguration.NM_SHUTSDWON_ON_RM_CONNECTION_FAILURES, true);
+ nm.init(conf);
+ nm.start();
+ Thread.sleep(5000);
+ Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+ } finally {
+ nm.stop();
+ }
+
+ nm = new MyNodeManager3();
+ try {
+ conf.setBoolean(YarnConfiguration.NM_SHUTSDWON_ON_RM_CONNECTION_FAILURES, false);
+ nm.init(conf);
+ nm.start();
+ Thread.sleep(5000);
+ Assert.assertEquals(STATE.STARTED, nm.getServiceState());
+ } finally {
+ nm.stop();
+ }
+ }
}