Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-2177

The wait for spill completion should call Condition.awaitNanos(long nanosTimeout)

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.20.2
    • Fix Version/s: None
    • Component/s: tasktracker
    • Labels:
      None

      Description

      We sometimes saw maptask timeout in cdh3b2. Here is log from one of the maptasks:

      2010-11-04 10:34:23,820 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: buffer full= true
      2010-11-04 10:34:23,820 INFO org.apache.hadoop.mapred.MapTask: bufstart = 119534169; bufend = 59763857; bufvoid = 298844160

      2010-11-04 10:34:23,820 INFO org.apache.hadoop.mapred.MapTask: kvstart = 438913; kvend = 585320; length = 983040
      2010-11-04 10:34:41,615 INFO org.apache.hadoop.mapred.MapTask: Finished spill 3
      2010-11-04 10:35:45,352 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: buffer full= true

      2010-11-04 10:35:45,547 INFO org.apache.hadoop.mapred.MapTask: bufstart = 59763857; bufend = 298837899; bufvoid = 298844160
      2010-11-04 10:35:45,547 INFO org.apache.hadoop.mapred.MapTask: kvstart = 585320; kvend = 731585; length = 983040

      2010-11-04 10:45:41,289 INFO org.apache.hadoop.mapred.MapTask: Finished spill 4

      Note how long the last spill took.

      In MapTask.java, the following code waits for spill to finish:
      while (kvstart != kvend)

      { reporter.progress(); spillDone.await(); }

      In trunk code, code is similar.

      There is no timeout mechanism for Condition.await(). In case the SpillThread takes long before calling spillDone.signal(), we would see timeout.
      Condition.awaitNanos(long nanosTimeout) should be called.

        Issue Links

          Activity

          Hide
          Chris Douglas added a comment -

          It is forced to block because the buffer is full. Returning from collect without serializing the emitted record would be an error, as would serializing the record over data allocated to the spill. Changing the call as you suggest would affect correctness, unless you're arguing that the task should fail if the spill takes more than some set amount of time. If the task timeout is killing the task, then it's working as designed, and equivalently to the proposed mechanism.

          There are many reasons the spill could take a long time. Running with a combiner, using a non-RawComparator, spilling to a failing/slow disk, etc. It's possible you're seeing a race condition that causes the collection thread to miss the signal, but the fix would not be to add a timeout to the wait, but to fix the locking. Can you get a stack trace from a map task stuck in this state? If the job is rerun over the same data, do the same tasks hang? Do the timeouts occur on particular machines? Does the task succeed on later attempts on different machines?

          Show
          Chris Douglas added a comment - It is forced to block because the buffer is full. Returning from collect without serializing the emitted record would be an error, as would serializing the record over data allocated to the spill. Changing the call as you suggest would affect correctness, unless you're arguing that the task should fail if the spill takes more than some set amount of time. If the task timeout is killing the task, then it's working as designed, and equivalently to the proposed mechanism. There are many reasons the spill could take a long time. Running with a combiner, using a non- RawComparator , spilling to a failing/slow disk, etc. It's possible you're seeing a race condition that causes the collection thread to miss the signal, but the fix would not be to add a timeout to the wait, but to fix the locking. Can you get a stack trace from a map task stuck in this state? If the job is rerun over the same data, do the same tasks hang? Do the timeouts occur on particular machines? Does the task succeed on later attempts on different machines?
          Hide
          Todd Lipcon added a comment -

          Chris, I think Ted's point is not that it should return after a timeout, but that it should call reporter.progress and then go back to waiting. This seems valid - if the mapper thread is blocked because the buffer is full, either the buffer spill thread should be calling progress() as it spills the buffer to disk, or the blocked thread should periodically unblock to call progress(), don't you think?

          I think so long as the spiller is actually making some progress getting bytes to disk, it shouldn't cause a task failure - this kind of "alive but very slow" scenario is supposed to be handled by speculation rather than suicide

          Show
          Todd Lipcon added a comment - Chris, I think Ted's point is not that it should return after a timeout, but that it should call reporter.progress and then go back to waiting. This seems valid - if the mapper thread is blocked because the buffer is full, either the buffer spill thread should be calling progress() as it spills the buffer to disk, or the blocked thread should periodically unblock to call progress(), don't you think? I think so long as the spiller is actually making some progress getting bytes to disk, it shouldn't cause a task failure - this kind of "alive but very slow" scenario is supposed to be handled by speculation rather than suicide
          Hide
          Ted Yu added a comment -

          SpillThread doesn't currently have reference to TaskReporter.
          It is easier to use short timeout for spillDone.awaitNanos() so that Buffer.write() can call progress().

          Show
          Ted Yu added a comment - SpillThread doesn't currently have reference to TaskReporter. It is easier to use short timeout for spillDone.awaitNanos() so that Buffer.write() can call progress().
          Hide
          Chris Douglas added a comment -

          SpillThread doesn't currently have reference to TaskReporter.
          It is easier to use short timeout for spillDone.awaitNanos() so that Buffer.write() can call progress().

          That prevents the task from being killed, but its semantics are incorrect. Todd's suggestion- calling progress() during the merge- at least ensures that the task is doing work; reporting progress from a thread that isn't actually proceeding is broken. Isn't progress already reported during the merge? Can you provide more detail on the environment where you're observing this?

          Show
          Chris Douglas added a comment - SpillThread doesn't currently have reference to TaskReporter. It is easier to use short timeout for spillDone.awaitNanos() so that Buffer.write() can call progress(). That prevents the task from being killed, but its semantics are incorrect. Todd's suggestion- calling progress() during the merge- at least ensures that the task is doing work; reporting progress from a thread that isn't actually proceeding is broken. Isn't progress already reported during the merge? Can you provide more detail on the environment where you're observing this?
          Hide
          Arun C Murthy added a comment -

          calling progress() during the merge- at least ensures that the task is doing work; reporting progress from a thread that isn't actually proceeding is broken. Isn't progress already reported during the merge? Can you provide more detail on the environment where you're observing this?

          Chris, we've seen this at Y! too, this might just be a bug in progress reporting during the merge.

          Show
          Arun C Murthy added a comment - calling progress() during the merge- at least ensures that the task is doing work; reporting progress from a thread that isn't actually proceeding is broken. Isn't progress already reported during the merge? Can you provide more detail on the environment where you're observing this? Chris, we've seen this at Y! too, this might just be a bug in progress reporting during the merge.
          Hide
          Ted Yu added a comment -

          I didn't capture stack trace of MapTask when this happened - will do next time.

          We can add call to reporter.progress() in sortAndSpill()
          But since we don't know how long each call to writer.append() / combinerRunner.combine() would take, there is no guarantee that we can prevent this issue from happening.

          Show
          Ted Yu added a comment - I didn't capture stack trace of MapTask when this happened - will do next time. We can add call to reporter.progress() in sortAndSpill() But since we don't know how long each call to writer.append() / combinerRunner.combine() would take, there is no guarantee that we can prevent this issue from happening.
          Hide
          Ted Yu added a comment -

          The occurrence in our cluster may have something to do with the fact that we run HBase region server alongside task tracker.

          Reporting progress from a thread that isn't blocked by long write to disk or combiner call is one option. We can put some limit on the total amount of time spillDone.awaitNanos() calls take in the following loop:
          while (kvstart != kvend)

          { reporter.progress(); spillDone.awaitNanos(); }
          Show
          Ted Yu added a comment - The occurrence in our cluster may have something to do with the fact that we run HBase region server alongside task tracker. Reporting progress from a thread that isn't blocked by long write to disk or combiner call is one option. We can put some limit on the total amount of time spillDone.awaitNanos() calls take in the following loop: while (kvstart != kvend) { reporter.progress(); spillDone.awaitNanos(); }
          Hide
          Chris Douglas added a comment -

          The progress reporting during the merge is not on every record emitted. For jobs with combiners that emit far fewer records than they consume, it's possible that the framework fails to report progress, though (1) IIRC it reports at least once for every partition and (2) that wouldn't explain why the job is taking so much longer for a particular spill.

          Adding some reporting in the reader could make sense, but we could use more information. Adding progress reporting only to prevent the job from being killed may be the wrong fix.

          But since we don't know how long each call to writer.append() / combinerRunner.combine() would take, there is no guarantee that we can prevent this issue from happening.

          If the task is stuck, then it should be killed. I agree that the timeout mechanism's granularity is too coarse to measure all progress, but the overhead of measuring every event is too high to be the default.

          Reporting progress from a thread that isn't blocked by long write to disk or combiner call is one option. We can put some limit on the total amount of time spillDone.awaitNanos() calls take in the following loop:

          Again, that thread isn't making progress. It shouldn't prevent the task from getting killed if the merge is truly stuck.

          Ted, please provide some details on the job you're running (w/ combiner? do reexecutions succeed? does this happen on particular machines? do other tasks complete normally while another is in this state?).

          Show
          Chris Douglas added a comment - The progress reporting during the merge is not on every record emitted. For jobs with combiners that emit far fewer records than they consume, it's possible that the framework fails to report progress, though (1) IIRC it reports at least once for every partition and (2) that wouldn't explain why the job is taking so much longer for a particular spill. Adding some reporting in the reader could make sense, but we could use more information. Adding progress reporting only to prevent the job from being killed may be the wrong fix. But since we don't know how long each call to writer.append() / combinerRunner.combine() would take, there is no guarantee that we can prevent this issue from happening. If the task is stuck, then it should be killed. I agree that the timeout mechanism's granularity is too coarse to measure all progress, but the overhead of measuring every event is too high to be the default. Reporting progress from a thread that isn't blocked by long write to disk or combiner call is one option. We can put some limit on the total amount of time spillDone.awaitNanos() calls take in the following loop: Again, that thread isn't making progress. It shouldn't prevent the task from getting killed if the merge is truly stuck. Ted, please provide some details on the job you're running (w/ combiner? do reexecutions succeed? does this happen on particular machines? do other tasks complete normally while another is in this state?).
          Hide
          Ted Yu added a comment -

          When this issue appears again, I will collect more details.

          Show
          Ted Yu added a comment - When this issue appears again, I will collect more details.
          Hide
          Luke Lu added a comment -

          The combiner code path doesn't report progress at some configured interval, so it can timeout when map output is large. The right fix is probably in CombineOutputCollector#collect to report progress a la Merger#writeFile.

          Show
          Luke Lu added a comment - The combiner code path doesn't report progress at some configured interval, so it can timeout when map output is large. The right fix is probably in CombineOutputCollector#collect to report progress a la Merger#writeFile.
          Hide
          Anupam Seth added a comment -

          Found MAPREDUCE-2187 to be more closely related to the fix I was intending (though this seems closely related per the previous comment), so putting back original description and revoking ownership.

          Show
          Anupam Seth added a comment - Found MAPREDUCE-2187 to be more closely related to the fix I was intending (though this seems closely related per the previous comment), so putting back original description and revoking ownership.

            People

            • Assignee:
              Unassigned
              Reporter:
              Ted Yu
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development