diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5265cf1..20320b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -87,6 +87,9 @@ private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; + private long rmConnectWaitMS; + private long rmConnectionRetryIntervalMS; + private boolean waitForEver; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -187,12 +190,12 @@ protected ResourceTracker getRMClient() { private void registerWithRM() throws YarnRemoteException { Configuration conf = getConfig(); - long rmConnectWaitMS = + rmConnectWaitMS = conf.getInt( YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) * 1000; - long rmConnectionRetryIntervalMS = + rmConnectionRetryIntervalMS = conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, YarnConfiguration @@ -205,7 +208,7 @@ private void registerWithRM() throws YarnRemoteException { " should not be negative."); } - boolean waitForEver = (rmConnectWaitMS == -1000); + waitForEver = (rmConnectWaitMS == -1000); if(! waitForEver) { if(rmConnectWaitMS < 0) { @@ -392,6 +395,9 @@ public void run() { while (!isStopped) { // Send heartbeat try { + NodeHeartbeatResponse response = null; + int rmRetryCount = 0; + long waitStartTime = System.currentTimeMillis(); NodeStatus nodeStatus = getNodeStatus(); nodeStatus.setResponseId(lastHeartBeatID); @@ -402,8 +408,31 @@ public void run() { request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey()); } - NodeHeartbeatResponse response = - resourceTracker.nodeHeartbeat(request); + while (!isStopped) { + try { + rmRetryCount++; + response = resourceTracker.nodeHeartbeat(request); + break; + } catch (Throwable e) { + LOG.warn("Trying to heartbeat to ResourceManager, " + + "current no. of failed attempts is " + rmRetryCount); + if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS + || waitForEver) { + try { + LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 + + " seconds before next heartbeat to RM"); + Thread.sleep(rmConnectionRetryIntervalMS); + } catch(InterruptedException ex) { + //done nothing + } + } else { + String errorMessage = "Failed to heartbeat to RM, " + + "no. of failed attempts is "+rmRetryCount; + LOG.error(errorMessage,e); + throw new YarnException(errorMessage,e); + } + } + } //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); // See if the master-key has rolled over @@ -448,6 +477,11 @@ public void run() { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); } + } catch (YarnException e) { + //catch and throw the exception if tried MAX wait time to connect RM + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + throw e; } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the // NM still running. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 59b65e2..3683da3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -397,6 +397,15 @@ public void deleteBaseDir() throws IOException { @Test public void testNMRegistration() throws InterruptedException { + final long connectionWaitSecs = 5; + final long connectionRetryIntervalSecs = 1; + YarnConfiguration conf = createNMConfig(); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + connectionWaitSecs); + conf.setLong(YarnConfiguration + .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + connectionRetryIntervalSecs); + nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -406,7 +415,6 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; - YarnConfiguration conf = createNMConfig(); nm.init(conf); // verify that the last service is the nodeStatusUpdater (ie registration