Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-362

tasks can get lost when reporting task completion to the JobTracker has an error

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.5.0
    • None
    • None

    Description

      Basically, the JobTracker used to lose some updates about successful map tasks and it would assume that the tasks are still running (the old progress report is what it used to display in the web page). Now this would cause the reduces to also wait for the map output and they would never receive the output. This would cause the job to appear as if it was hung.

      The following piece of code sends the status of tasks to the JobTracker:

      synchronized (this) {
      for (Iterator it = runningTasks.values().iterator();
      it.hasNext(); ) {
      TaskInProgress tip = (TaskInProgress) it.next();
      TaskStatus status = tip.createStatus();
      taskReports.add(status);
      if (status.getRunState() != TaskStatus.RUNNING) {
      if (tip.getTask().isMapTask())

      { mapTotal--; } else { reduceTotal--; }
      it.remove();
      }
      }
      }

      //
      // Xmit the heartbeat
      //

      TaskTrackerStatus status =
      new TaskTrackerStatus(taskTrackerName, localHostname,
      httpPort, taskReports,
      failures);
      int resultCode = jobClient.emitHeartbeat(status, justStarted);


      Notice that the completed TIPs are removed from runningTasks data structure. Now, if the emitHeartBeat threw an exception (if it could not communicate with the JobTracker till the IPC timeout expires) then this update is lost. And the next time it sends the hearbeat this completed task's status is missing and hence the JobTracker doesn't know about this completed task. So, one solution to this is to remove the completed TIPs from runningTasks after emitHeartbeat returns. Here is how the new code would look like:


      synchronized (this) {
      for (Iterator it = runningTasks.values().iterator();
      it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); TaskStatus status = tip.createStatus(); taskReports.add(status); }
      }

      //
      // Xmit the heartbeat
      //

      TaskTrackerStatus status =
      new TaskTrackerStatus(taskTrackerName, localHostname,
      httpPort, taskReports,
      failures);
      int resultCode = jobClient.emitHeartbeat(status, justStarted);
      synchronized (this) {
      for (Iterator it = runningTasks.values().iterator();
      it.hasNext(); ) {
      TaskInProgress tip = (TaskInProgress) it.next();
      if (tip.runstate != TaskStatus.RUNNING) {
      if (tip.getTask().isMapTask()) { mapTotal--; }

      else

      { reduceTotal--; }

      it.remove();
      }
      }
      }

      Attachments

        1. lost-status-updates.patch
          13 kB
          Owen O'Malley
        2. progress-update.patch
          14 kB
          Owen O'Malley

        Issue Links

          Activity

            omalley Owen O'Malley added a comment -

            Your patch has a couple of issues. Here is a trial patch, but it needs a lot more testing. The changes are:
            1. if the rpc failed after the server updated the status, the status could be updated twice.
            2. the runningTasks could change from when the status report was generated.

            omalley Owen O'Malley added a comment - Your patch has a couple of issues. Here is a trial patch, but it needs a lot more testing. The changes are: 1. if the rpc failed after the server updated the status, the status could be updated twice. 2. the runningTasks could change from when the status report was generated.
            ddas Devaraj Das added a comment -

            Thanks Owen for putting this up on Jira! Well the code snippet that I sent you was a quick & dirty hack for the problem I was facing. Of course, yours is a much more elaborate solution. However, with this patch, the problem appears somewhere else - the reduces don't make progress. Even after all maps finish, the reduces remain stuck at 0% progress.
            I haven't yet fully analyzed your patch. I will do that.

            ddas Devaraj Das added a comment - Thanks Owen for putting this up on Jira! Well the code snippet that I sent you was a quick & dirty hack for the problem I was facing. Of course, yours is a much more elaborate solution. However, with this patch, the problem appears somewhere else - the reduces don't make progress. Even after all maps finish, the reduces remain stuck at 0% progress. I haven't yet fully analyzed your patch. I will do that.
            ddas Devaraj Das added a comment -

            Discovered a minor problem with this patch which caused the status updates to never happen on the job status web page. The call to recomputeProgress for a particular task is conditioned on changedProgress which is true only when at least 0.01 units of progress (in the range 0.0 - 1.0) is seen since the last time progress was reported (by any tasktracker). Changing this figure to 0.00001 solved the problem.

            ddas Devaraj Das added a comment - Discovered a minor problem with this patch which caused the status updates to never happen on the job status web page. The call to recomputeProgress for a particular task is conditioned on changedProgress which is true only when at least 0.01 units of progress (in the range 0.0 - 1.0) is seen since the last time progress was reported (by any tasktracker). Changing this figure to 0.00001 solved the problem.
            omalley Owen O'Malley added a comment -

            There is a another aspect/cause of this that we just observed. What we saw was a large job with 5 maps that were not running (all maps were done) and yet all of the reduces were waiting for their output. Closer examination of the log showed that the maps had been logged as complete by the job tracker and were not being run anywhere. The TaskInProgress was showing 100% complete in the web ui. The Task details however was showing 50% and running.

            My best guess as to the failure scenario is:

            1. task tracker sends progress of (50%, RUNNING) for map_123 to job tracker
            2. task tracker gets time out on progress message
            3. map_123 finishes
            4. task tracker send progress of (100%, SUCCEED) for map_123 to job tracker
            5. the two messages are taken from the rpc queue and given to separate handler threads
            6. the SUCCEED message thread gets the JobTracker lock and updates the status of the Task and TaskInProgress.
            7. the RUNNING message thread gets the lock and updates the status of the Task
            8. reduces ask for the map output and nothing is available

            Therefore, we also need to make sure that Tasks are not allowed to move from SUCCEED or FAILED to RUNNING. That will solve this problem. However, this represents a much deeper and pervasive problem that we will need to address that any two rpc calls from the same thread can be executed in an arbitrary order.

            omalley Owen O'Malley added a comment - There is a another aspect/cause of this that we just observed. What we saw was a large job with 5 maps that were not running (all maps were done) and yet all of the reduces were waiting for their output. Closer examination of the log showed that the maps had been logged as complete by the job tracker and were not being run anywhere. The TaskInProgress was showing 100% complete in the web ui. The Task details however was showing 50% and running. My best guess as to the failure scenario is: 1. task tracker sends progress of (50%, RUNNING) for map_123 to job tracker 2. task tracker gets time out on progress message 3. map_123 finishes 4. task tracker send progress of (100%, SUCCEED) for map_123 to job tracker 5. the two messages are taken from the rpc queue and given to separate handler threads 6. the SUCCEED message thread gets the JobTracker lock and updates the status of the Task and TaskInProgress. 7. the RUNNING message thread gets the lock and updates the status of the Task 8. reduces ask for the map output and nothing is available Therefore, we also need to make sure that Tasks are not allowed to move from SUCCEED or FAILED to RUNNING. That will solve this problem. However, this represents a much deeper and pervasive problem that we will need to address that any two rpc calls from the same thread can be executed in an arbitrary order.
            ddas Devaraj Das added a comment -

            Will timestamps help for the generic case? So the thread invoking the RPC timestamps the message and when the server handler pulls a job out of the queue for execution, it makes a note of the sender's timestamp. It then looks up a map from client address to client-timestamp and if the current call's timestamp happens to be older than the one found in the map, the server simply ignores that. This means that we don't guarantee that all calls will be invoked at the server.
            Additionally, we can have a flag that the client sets forcing the server to execute the call even if it violates the timeliness of the call. This may be required in the DFS operations where a client wants to, lets say, create a file (not absolutely sure whether this is a sensible use-case) but in general this may be helpful.
            Both of the above can be implemented in the same lines as call-id is handled today (it is a part of each RPC call). In fact, the call-id itself can serve as the timestamp. Makes sense?

            ddas Devaraj Das added a comment - Will timestamps help for the generic case? So the thread invoking the RPC timestamps the message and when the server handler pulls a job out of the queue for execution, it makes a note of the sender's timestamp. It then looks up a map from client address to client-timestamp and if the current call's timestamp happens to be older than the one found in the map, the server simply ignores that. This means that we don't guarantee that all calls will be invoked at the server. Additionally, we can have a flag that the client sets forcing the server to execute the call even if it violates the timeliness of the call. This may be required in the DFS operations where a client wants to, lets say, create a file (not absolutely sure whether this is a sensible use-case) but in general this may be helpful. Both of the above can be implemented in the same lines as call-id is handled today (it is a part of each RPC call). In fact, the call-id itself can serve as the timestamp. Makes sense?
            cutting Doug Cutting added a comment -

            I worry about even relying on timestamps. In general, in distributed systems, communications can arrive out of order, no? So it is better to develop protocols that allow for this.

            cutting Doug Cutting added a comment - I worry about even relying on timestamps. In general, in distributed systems, communications can arrive out of order, no? So it is better to develop protocols that allow for this.
            ddas Devaraj Das added a comment -

            Yes, that's true. That's why in my proposal (the second part) I allow for this also - the sender of the RPC explicitly flags an RPC message that it should be executed no matter when it is considered for execution. That is, even if a message, m1, is received later than other messages that it sends after sending m1, the server should honor that and execute the RPC.
            So basically, the client decides whether the server should ignore all (not-yet-executed) RPC requests sent before the current RPC request that the server is executing OR the server should execute all RPC requests.
            We need to serialize the execution of RPC requests based on client addresses (to avoid the problem of multiple requests from the same client getting executed in parallel by different handler threads). This will avoid the race condition for cases like job status updates.
            Yes, I agree that this we can implement the above in the protocol itself. I mean each protocol could have a flag signifying either "execute all RPCs" or "ignore RPC requests with timestamps later than the current RPC's timestamp".
            Since in the RPC implementation in Hadoop, we have a single client for each protocol, this policy will work I think.

            ddas Devaraj Das added a comment - Yes, that's true. That's why in my proposal (the second part) I allow for this also - the sender of the RPC explicitly flags an RPC message that it should be executed no matter when it is considered for execution. That is, even if a message, m1, is received later than other messages that it sends after sending m1, the server should honor that and execute the RPC. So basically, the client decides whether the server should ignore all (not-yet-executed) RPC requests sent before the current RPC request that the server is executing OR the server should execute all RPC requests. We need to serialize the execution of RPC requests based on client addresses (to avoid the problem of multiple requests from the same client getting executed in parallel by different handler threads). This will avoid the race condition for cases like job status updates. Yes, I agree that this we can implement the above in the protocol itself. I mean each protocol could have a flag signifying either "execute all RPCs" or "ignore RPC requests with timestamps later than the current RPC's timestamp". Since in the RPC implementation in Hadoop, we have a single client for each protocol, this policy will work I think.
            omalley Owen O'Malley added a comment -

            The issue isn't that messages get handled out of order, but that messages from a single thread are handled out of order. In theory there shouldn't be a second message from the same thread. However, as we observed, it is possible if the first call times out.

            So you wouldn't want to have a global time stamp, just one per a thread on the client. Another approach would be to hash the client thread and only allow a single rpc handler thread to handle rpcs from a single client thread.

            omalley Owen O'Malley added a comment - The issue isn't that messages get handled out of order, but that messages from a single thread are handled out of order. In theory there shouldn't be a second message from the same thread. However, as we observed, it is possible if the first call times out. So you wouldn't want to have a global time stamp, just one per a thread on the client. Another approach would be to hash the client thread and only allow a single rpc handler thread to handle rpcs from a single client thread.

            +1 to the second idea (unique hashcode / client thread, forcing serialization the server via mapping it to the same thread ID)

            eric14 Eric Baldeschwieler added a comment - +1 to the second idea (unique hashcode / client thread, forcing serialization the server via mapping it to the same thread ID)
            omalley Owen O'Malley added a comment -

            This patch makes the following changes:
            1. Changes the diagnostic info from a Vector to a ArrayList.
            2. Prevents tasks from moving from

            {FAILED,SUCCEEDED}

            to RUNNING
            3. Only removes completed tasks from the runningTasks list after it is sure
            the job tracker got the finished message.
            4. Don't set the job's progress meter to 100% when the job is complete to
            avoid counting the final reduce's final progress update twice.
            5. Pass the JobTrackerMetrics down into the procedures that actually decide
            when a task or job is finished to avoid double counting.
            6. Rename some local variables to better reflect their values.

            omalley Owen O'Malley added a comment - This patch makes the following changes: 1. Changes the diagnostic info from a Vector to a ArrayList. 2. Prevents tasks from moving from {FAILED,SUCCEEDED} to RUNNING 3. Only removes completed tasks from the runningTasks list after it is sure the job tracker got the finished message. 4. Don't set the job's progress meter to 100% when the job is complete to avoid counting the final reduce's final progress update twice. 5. Pass the JobTrackerMetrics down into the procedures that actually decide when a task or job is finished to avoid double counting. 6. Rename some local variables to better reflect their values.
            cutting Doug Cutting added a comment -

            I just committed this. Thanks, Owen!

            cutting Doug Cutting added a comment - I just committed this. Thanks, Owen!

            People

              omalley Owen O'Malley
              ddas Devaraj Das
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: