diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index e3dae41..47ff988 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -87,7 +87,10 @@ private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - + private long rmConnectWaitMS; + private long rmConnectionRetryIntervalMS; + private boolean waitForEver; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -189,12 +192,12 @@ protected ResourceTracker getRMClient() { private void registerWithRM() throws YarnRemoteException { Configuration conf = getConfig(); - long rmConnectWaitMS = + rmConnectWaitMS = conf.getInt( YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) * 1000; - long rmConnectionRetryIntervalMS = + rmConnectionRetryIntervalMS = conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, YarnConfiguration @@ -207,7 +210,7 @@ private void registerWithRM() throws YarnRemoteException { " should not be negative."); } - boolean waitForEver = (rmConnectWaitMS == -1000); + waitForEver = (rmConnectWaitMS == -1000); if(! waitForEver) { if(rmConnectWaitMS < 0) { @@ -398,19 +401,48 @@ public void run() { synchronized (heartbeatMonitor) { heartbeatMonitor.wait(heartBeatInterval); } - NodeStatus nodeStatus = getNodeStatus(); - nodeStatus.setResponseId(lastHeartBeatID); + HeartbeatResponse response = null; + int rmRetryCount = 0; + long waitStartTime = System.currentTimeMillis(); - NodeHeartbeatRequest request = recordFactory - .newRecordInstance(NodeHeartbeatRequest.class); - request.setNodeStatus(nodeStatus); - if (isSecurityEnabled()) { - request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey()); + while (true) { + try { + rmRetryCount++; + NodeStatus nodeStatus = getNodeStatus(); + nodeStatus.setResponseId(lastHeartBeatID); + + NodeHeartbeatRequest request = recordFactory + .newRecordInstance(NodeHeartbeatRequest.class); + request.setNodeStatus(nodeStatus); + if (isSecurityEnabled()) { + request + .setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey()); + } + response = resourceTracker.nodeHeartbeat(request) + .getHeartbeatResponse(); + break; + } catch (Throwable e) { + LOG.warn("Trying to connect to ResourceManager, " + + "current no. of failed attempts is " + rmRetryCount); + if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS + || waitForEver) { + try { + LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 + + " seconds before next connection retry to RM"); + Thread.sleep(rmConnectionRetryIntervalMS); + } catch(InterruptedException ex) { + //done nothing + } + } else { + String errorMessage = "Failed to Connect to RM, " + + "no. of failed attempts is "+rmRetryCount; + LOG.error(errorMessage,e); + throw new YarnException(errorMessage,e); + } + } } - HeartbeatResponse response = - resourceTracker.nodeHeartbeat(request).getHeartbeatResponse(); - + assert response != null; // See if the master-key has rolled over if (isSecurityEnabled()) { MasterKey updatedMasterKey = response.getMasterKey(); @@ -453,6 +485,9 @@ public void run() { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); } + } catch (YarnException e) { + //catch and throw the exception if tried MAX wait time to connect RM + throw e; } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the // NM still running. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index ff9e082..82db5be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -419,6 +419,16 @@ public void deleteBaseDir() throws IOException { @Test public void testNMRegistration() throws InterruptedException { + final long connectionWaitSecs = 5; + final long connectionRetryIntervalSecs = 1; + //Waiting for rmStartIntervalMS, RM will be started + YarnConfiguration conf = createNMConfig(); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + connectionWaitSecs); + conf.setLong(YarnConfiguration + .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + connectionRetryIntervalSecs); + nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -428,7 +438,6 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; - YarnConfiguration conf = createNMConfig(); nm.init(conf); // verify that the last service is the nodeStatusUpdater (ie registration