Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
None
-
None
-
suse.
-
Reviewed
Description
see the red color:
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java
protected void startStatusUpdater() {
new Thread("Node Status Updater") {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
// Send heartbeat
try {
synchronized (heartbeatMonitor)
// Before we send the heartbeat, we get the NodeStatus,
// whose method removes completed containers.
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
// But if the nodeHeartbeat fails, we've already removed the containers away to know about it. We aren't handling a nodeHeartbeat failure case here.
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
if (response.getNodeAction() == NodeAction.SHUTDOWN)
{ LOG .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + " hence shutting down."); NodeStatusUpdaterImpl.this.stop(); break; }if (response.getNodeAction() == NodeAction.REBOOT)
{ LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); NodeStatusUpdaterImpl.this.reboot(); break; } lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanupList();
if (containersToCleanup.size() != 0)
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0)
} catch (Throwable e)
{ // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); } }
}
}.start();
}
private NodeStatus getNodeStatus() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext() {
Entry<ContainerId, Container> e = i.next();
ContainerId containerId = e.getKey();
Container container = e.getValue();
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containersStatuses.add(containerStatus);
++numActiveContainers;
LOG.info("Sending out status for container: " + containerStatus);
// Here is the part that removes the completed containers.
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
LOG.info("Removed completed container " + containerId);
}
}
nodeStatus.setContainersStatuses(containersStatuses);
LOG.debug(this.nodeId + " sending out status for "
+ numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled())
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
nodeStatus.setKeepAliveApplications(keepAliveAppIds);
return nodeStatus;
}