Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-362

tasks can get lost when reporting task completion to the JobTracker has an error

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.5.0
    • None
    • None

    Description

      Basically, the JobTracker used to lose some updates about successful map tasks and it would assume that the tasks are still running (the old progress report is what it used to display in the web page). Now this would cause the reduces to also wait for the map output and they would never receive the output. This would cause the job to appear as if it was hung.

      The following piece of code sends the status of tasks to the JobTracker:

      synchronized (this) {
      for (Iterator it = runningTasks.values().iterator();
      it.hasNext(); ) {
      TaskInProgress tip = (TaskInProgress) it.next();
      TaskStatus status = tip.createStatus();
      taskReports.add(status);
      if (status.getRunState() != TaskStatus.RUNNING) {
      if (tip.getTask().isMapTask())

      { mapTotal--; } else { reduceTotal--; }
      it.remove();
      }
      }
      }

      //
      // Xmit the heartbeat
      //

      TaskTrackerStatus status =
      new TaskTrackerStatus(taskTrackerName, localHostname,
      httpPort, taskReports,
      failures);
      int resultCode = jobClient.emitHeartbeat(status, justStarted);


      Notice that the completed TIPs are removed from runningTasks data structure. Now, if the emitHeartBeat threw an exception (if it could not communicate with the JobTracker till the IPC timeout expires) then this update is lost. And the next time it sends the hearbeat this completed task's status is missing and hence the JobTracker doesn't know about this completed task. So, one solution to this is to remove the completed TIPs from runningTasks after emitHeartbeat returns. Here is how the new code would look like:


      synchronized (this) {
      for (Iterator it = runningTasks.values().iterator();
      it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); TaskStatus status = tip.createStatus(); taskReports.add(status); }
      }

      //
      // Xmit the heartbeat
      //

      TaskTrackerStatus status =
      new TaskTrackerStatus(taskTrackerName, localHostname,
      httpPort, taskReports,
      failures);
      int resultCode = jobClient.emitHeartbeat(status, justStarted);
      synchronized (this) {
      for (Iterator it = runningTasks.values().iterator();
      it.hasNext(); ) {
      TaskInProgress tip = (TaskInProgress) it.next();
      if (tip.runstate != TaskStatus.RUNNING) {
      if (tip.getTask().isMapTask()) { mapTotal--; }

      else

      { reduceTotal--; }

      it.remove();
      }
      }
      }

      Attachments

        1. progress-update.patch
          14 kB
          Owen O'Malley
        2. lost-status-updates.patch
          13 kB
          Owen O'Malley

        Issue Links

          Activity

            People

              omalley Owen O'Malley
              ddas Devaraj Das
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: