Uploaded image for project: 'Hadoop YARN'
  1. Hadoop YARN
  2. YARN-101

If the heartbeat message loss, the nodestatus info of complete container will loss too.

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.1.0-beta
    • Component/s: nodemanager
    • Labels:
      None
    • Environment:

      suse.

    • Hadoop Flags:
      Reviewed

      Description

      see the red color:

      org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java

      protected void startStatusUpdater() {

      new Thread("Node Status Updater") {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
      int lastHeartBeatID = 0;
      while (!isStopped) {
      // Send heartbeat
      try {
      synchronized (heartbeatMonitor)

      { heartbeatMonitor.wait(heartBeatInterval); }


      // Before we send the heartbeat, we get the NodeStatus,
      // whose method removes completed containers.
      NodeStatus nodeStatus = getNodeStatus();

      nodeStatus.setResponseId(lastHeartBeatID);

      NodeHeartbeatRequest request = recordFactory
      .newRecordInstance(NodeHeartbeatRequest.class);
      request.setNodeStatus(nodeStatus);


      // But if the nodeHeartbeat fails, we've already removed the containers away to know about it. We aren't handling a nodeHeartbeat failure case here.
      HeartbeatResponse response =
      resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();

      if (response.getNodeAction() == NodeAction.SHUTDOWN)

      { LOG .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + " hence shutting down."); NodeStatusUpdaterImpl.this.stop(); break; }

      if (response.getNodeAction() == NodeAction.REBOOT)

      { LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); NodeStatusUpdaterImpl.this.reboot(); break; }

      lastHeartBeatID = response.getResponseId();
      List<ContainerId> containersToCleanup = response
      .getContainersToCleanupList();
      if (containersToCleanup.size() != 0)

      { dispatcher.getEventHandler().handle( new CMgrCompletedContainersEvent(containersToCleanup)); }

      List<ApplicationId> appsToCleanup =
      response.getApplicationsToCleanupList();
      //Only start tracking for keepAlive on FINISH_APP
      trackAppsForKeepAlive(appsToCleanup);
      if (appsToCleanup.size() != 0)

      { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); }

      } catch (Throwable e)

      { // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); }

      }
      }
      }.start();
      }

      private NodeStatus getNodeStatus() {

      NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
      nodeStatus.setNodeId(this.nodeId);

      int numActiveContainers = 0;
      List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
      for (Iterator<Entry<ContainerId, Container>> i =
      this.context.getContainers().entrySet().iterator(); i.hasNext() {
      Entry<ContainerId, Container> e = i.next();
      ContainerId containerId = e.getKey();
      Container container = e.getValue();

      // Clone the container to send it to the RM
      org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
      container.cloneAndGetContainerStatus();
      containersStatuses.add(containerStatus);
      ++numActiveContainers;
      LOG.info("Sending out status for container: " + containerStatus);


      // Here is the part that removes the completed containers.
      if (containerStatus.getState() == ContainerState.COMPLETE) {
      // Remove
      i.remove();

      LOG.info("Removed completed container " + containerId);
      }
      }
      nodeStatus.setContainersStatuses(containersStatuses);

      LOG.debug(this.nodeId + " sending out status for "
      + numActiveContainers + " containers");

      NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
      nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
      nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
      nodeHealthStatus.setLastHealthReportTime(
      healthChecker.getLastHealthReportTime());
      if (LOG.isDebugEnabled())

      { LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() + ", " + nodeHealthStatus.getHealthReport()); }

      nodeStatus.setNodeHealthStatus(nodeHealthStatus);

      List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
      nodeStatus.setKeepAliveApplications(keepAliveAppIds);

      return nodeStatus;
      }

        Attachments

        1. YARN-101.1.patch
          3 kB
          Xuan Gong
        2. YARN-101.2.patch
          15 kB
          Xuan Gong
        3. YARN-101.3.patch
          16 kB
          Xuan Gong
        4. YARN-101.4.patch
          16 kB
          Xuan Gong
        5. YARN-101.5.patch
          16 kB
          Xuan Gong
        6. YARN-101.6.patch
          18 kB
          Xuan Gong

          Activity

            People

            • Assignee:
              xgong Xuan Gong
              Reporter:
              xieguiming xieguiming
            • Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: