diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index e3dae41..d344e25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -87,7 +87,10 @@ private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - + private long rmConnectWaitMS; + private long rmConnectionRetryIntervalMS; + private boolean waitForEver; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -189,12 +192,12 @@ protected ResourceTracker getRMClient() { private void registerWithRM() throws YarnRemoteException { Configuration conf = getConfig(); - long rmConnectWaitMS = + rmConnectWaitMS = conf.getInt( YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) * 1000; - long rmConnectionRetryIntervalMS = + rmConnectionRetryIntervalMS = conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, YarnConfiguration @@ -207,7 +210,7 @@ private void registerWithRM() throws YarnRemoteException { " should not be negative."); } - boolean waitForEver = (rmConnectWaitMS == -1000); + waitForEver = (rmConnectWaitMS == -1000); if(! waitForEver) { if(rmConnectWaitMS < 0) { @@ -398,19 +401,48 @@ public void run() { synchronized (heartbeatMonitor) { heartbeatMonitor.wait(heartBeatInterval); } - NodeStatus nodeStatus = getNodeStatus(); - nodeStatus.setResponseId(lastHeartBeatID); + HeartbeatResponse response = null; + int rmRetryCount = 0; + long waitStartTime = System.currentTimeMillis(); - NodeHeartbeatRequest request = recordFactory - .newRecordInstance(NodeHeartbeatRequest.class); - request.setNodeStatus(nodeStatus); - if (isSecurityEnabled()) { - request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey()); + while (true) { + try { + rmRetryCount++; + NodeStatus nodeStatus = getNodeStatus(); + nodeStatus.setResponseId(lastHeartBeatID); + + NodeHeartbeatRequest request = recordFactory + .newRecordInstance(NodeHeartbeatRequest.class); + request.setNodeStatus(nodeStatus); + if (isSecurityEnabled()) { + request + .setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey()); + } + response = resourceTracker.nodeHeartbeat(request) + .getHeartbeatResponse(); + break; + } catch (Throwable e) { + LOG.warn("Trying to connect to ResourceManager, " + + "current no. of failed attempts is " + rmRetryCount); + if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS + || waitForEver) { + try { + LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 + + " seconds before next connection retry to RM"); + Thread.sleep(rmConnectionRetryIntervalMS); + } catch(InterruptedException ex) { + //done nothing + } + } else { + String errorMessage = "Failed to Connect to RM, " + + "no. of failed attempts is "+rmRetryCount; + LOG.error(errorMessage,e); + throw new YarnException(errorMessage,e); + } + } } - HeartbeatResponse response = - resourceTracker.nodeHeartbeat(request).getHeartbeatResponse(); - + assert response != null; // See if the master-key has rolled over if (isSecurityEnabled()) { MasterKey updatedMasterKey = response.getMasterKey(); @@ -453,6 +485,9 @@ public void run() { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); } + } catch (YarnException e) { + //catch and throw the exception if tried MAX wait time to connect RM + throw e; } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the // NM still running.