Hadoop Common
  1. Hadoop Common
  2. HADOOP-1431

Map tasks can't timeout for failing to call progress

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.13.0
    • Fix Version/s: 0.13.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently the map task runner creates a thread that calls progress every second to keep the system from killing the map if the sort takes too long. This is the wrong approach, because it will cause stuck tasks to not be killed. The right solution is to have the sort call progress as it actually makes progress. This is part of what is going on in HADOOP-1374. A map gets stuck at 100% progress, but not done.

      1. 1431.patch
        6 kB
        Devaraj Das
      2. HADOOP-1431_3_20070601.patch
        7 kB
        Arun C Murthy
      3. HADOOP-1431_2_20070530.patch
        7 kB
        Arun C Murthy
      4. HADOOP-1431_1_20070525.patch
        12 kB
        Arun C Murthy

        Activity

        Hide
        Devaraj Das added a comment -

        By the way, this issue exists for ReduceTasks also. We have threads there too for reporting progress (so one candidate there is the merge that might get stuck due to faulty user code). Owen, since it has been proven that this issue is not a cause for HADOOP-1374, could we postpone these fixes to 0.14 (mostly because we don't have real/reported instances of these problems yet). I am fine either way though.

        Show
        Devaraj Das added a comment - By the way, this issue exists for ReduceTasks also. We have threads there too for reporting progress (so one candidate there is the merge that might get stuck due to faulty user code). Owen, since it has been proven that this issue is not a cause for HADOOP-1374 , could we postpone these fixes to 0.14 (mostly because we don't have real/reported instances of these problems yet). I am fine either way though.
        Hide
        Arun C Murthy added a comment -

        Here is a reasonably straight-forward to address the concerns raised by this patch - basically I have implemented a ReportingComparator which sends a progress update every 100 comparisions and this comparator is used for sorting/merging in both MapTask & ReduceTask.

        The idea is that the 'compare' operation is a metric independent of the actual sorting/merging algorithm and hence a good indicator of the 'progress' being made by the sort/merge done by the framework in map/reduce task...

        I have adopted a policy similar to the one already employed in MapTask where the RecordReader sends progress updates depending on the amount of bytes consumed from the input file i.e. the ReportingComparator wraps a comparator and a reporter object and sends an update every 100 comparisions. The advantage is that the sort algorithm (which could be user-code i.e. by extending BasicTypeSorterBase) is blissfully un-aware of the reporting going on under the covers and also it ensures that there is no way even user-supplied comparators (e.g. JobConf.getOutputValueGroupingComparator()) can by-pass this reporting mechanism).

        Appreciate review/feedback while I continue testing... I know Devaraj has some. smile

        Show
        Arun C Murthy added a comment - Here is a reasonably straight-forward to address the concerns raised by this patch - basically I have implemented a ReportingComparator which sends a progress update every 100 comparisions and this comparator is used for sorting/merging in both MapTask & ReduceTask. The idea is that the 'compare' operation is a metric independent of the actual sorting/merging algorithm and hence a good indicator of the 'progress' being made by the sort/merge done by the framework in map/reduce task... I have adopted a policy similar to the one already employed in MapTask where the RecordReader sends progress updates depending on the amount of bytes consumed from the input file i.e. the ReportingComparator wraps a comparator and a reporter object and sends an update every 100 comparisions. The advantage is that the sort algorithm (which could be user-code i.e. by extending BasicTypeSorterBase) is blissfully un-aware of the reporting going on under the covers and also it ensures that there is no way even user-supplied comparators (e.g. JobConf.getOutputValueGroupingComparator()) can by-pass this reporting mechanism). Appreciate review/feedback while I continue testing... I know Devaraj has some. smile
        Hide
        Doug Cutting added a comment -

        Comparators can be performance-sensitive, so I'd like to see some benchmarks of a ReportingComparator before we accept that solution.

        The long-term best patch for this would be to modify SequenceFile to actually report progress as it sorts. This is a long-standing issue.

        The current bug is that the progress thread is running when there's no sorting going on. It should only run during calls to MapTask#sortAndSpillToDisk(), not during the entire map. This should be easy to patch for the short term.

        Show
        Doug Cutting added a comment - Comparators can be performance-sensitive, so I'd like to see some benchmarks of a ReportingComparator before we accept that solution. The long-term best patch for this would be to modify SequenceFile to actually report progress as it sorts. This is a long-standing issue. The current bug is that the progress thread is running when there's no sorting going on. It should only run during calls to MapTask#sortAndSpillToDisk(), not during the entire map. This should be easy to patch for the short term.
        Hide
        Owen O'Malley added a comment -

        I'm sure we all agree that in the medium term, sort needs to report progress. But I agree that it will cause confusion and possibly slow downs if we put the reporting in the comparator. I think the easiest solution is to make the auto-progress thread have two new methods:
        code
        void suspendAutoProgress();
        void resumeAutoProgress();
        code
        and start/stop auto progress when you are in the sort.

        Show
        Owen O'Malley added a comment - I'm sure we all agree that in the medium term, sort needs to report progress. But I agree that it will cause confusion and possibly slow downs if we put the reporting in the comparator. I think the easiest solution is to make the auto-progress thread have two new methods: code void suspendAutoProgress(); void resumeAutoProgress(); code and start/stop auto progress when you are in the sort.
        Hide
        Devaraj Das added a comment -

        The main requirement we are after in this issue is that we need to allow sort to report progress. From the architecture point of view, I think it makes sense to have at least the MapReduce kernel part of sort aware of that - i.e., the generic BufferSorter.
        My major objection to this patch is that we are kind of short circuiting things making the thing look hacky IMO. I would much rather do it the following way:
        1) Add a method to the BufferSorter interface called setReporter(Reporter).
        2) Implementors of the interface, in this case the BasicTypeSorterBase would implement the method, and in this case would just store the reporter object. This is similar to the BufferSorter.setInputBuffer method.
        3) The BasicTypeSorterBase would periodically invoke the reporter.progress() to report progress. The compare method in the BasicTypeSorterBase class is a potential place where reporter.progress can be called.
        This way, we don't make the sort library (currently the MergeSorter, MergeSort classes) aware of the Reporter object but have everything in the MapReduce kernel. This preserves the boundaries that i originally intended to have between the various layers (HADOOP-331).

        For the reduceTask, we have threads for reporting progress for two phases:
        1) during the shuffle (and here we implicitly do the progress reporting for the ramfs merges too)
        2) during the merge of the on-disk files in the reduce phase
        The thread for the first case is still there in the current patch. If we are to really remove the issue, we should ideally remove the thread for the shuffle also since the ramfs merge might also get stuck (since user code is involved there).

        Similarly to BufferSorter, we could have an API for merge that takes a Reporter object and calls reporter.progress periodically. ReduceTask as well as the final merge on the MapTask could use that for the merges. Again, the argument here is that we do expect merge to report us progress and hence we enable it to do so.

        Show
        Devaraj Das added a comment - The main requirement we are after in this issue is that we need to allow sort to report progress. From the architecture point of view, I think it makes sense to have at least the MapReduce kernel part of sort aware of that - i.e., the generic BufferSorter. My major objection to this patch is that we are kind of short circuiting things making the thing look hacky IMO. I would much rather do it the following way: 1) Add a method to the BufferSorter interface called setReporter(Reporter). 2) Implementors of the interface, in this case the BasicTypeSorterBase would implement the method, and in this case would just store the reporter object. This is similar to the BufferSorter.setInputBuffer method. 3) The BasicTypeSorterBase would periodically invoke the reporter.progress() to report progress. The compare method in the BasicTypeSorterBase class is a potential place where reporter.progress can be called. This way, we don't make the sort library (currently the MergeSorter, MergeSort classes) aware of the Reporter object but have everything in the MapReduce kernel. This preserves the boundaries that i originally intended to have between the various layers ( HADOOP-331 ). For the reduceTask, we have threads for reporting progress for two phases: 1) during the shuffle (and here we implicitly do the progress reporting for the ramfs merges too) 2) during the merge of the on-disk files in the reduce phase The thread for the first case is still there in the current patch. If we are to really remove the issue, we should ideally remove the thread for the shuffle also since the ramfs merge might also get stuck (since user code is involved there). Similarly to BufferSorter, we could have an API for merge that takes a Reporter object and calls reporter.progress periodically. ReduceTask as well as the final merge on the MapTask could use that for the merges. Again, the argument here is that we do expect merge to report us progress and hence we enable it to do so.
        Hide
        Doug Cutting added a comment -

        I think we could equivalently and perhaps more simply and reliably launch and kill progress threads per call to sortAndSpillToDisk(). Thread spawning in Java is cheap, especially relative to sorting 100MB of data. Suspending and resuming requires careful synchronization, which is error-prone.

        In any case, we want to stop progress reporting in a 'finally' clause around sortAndSpillToDisk().

        Show
        Doug Cutting added a comment - I think we could equivalently and perhaps more simply and reliably launch and kill progress threads per call to sortAndSpillToDisk(). Thread spawning in Java is cheap, especially relative to sorting 100MB of data. Suspending and resuming requires careful synchronization, which is error-prone. In any case, we want to stop progress reporting in a 'finally' clause around sortAndSpillToDisk().
        Hide
        Doug Cutting added a comment -

        Devaraj, your solution sounds good for 0.14, but I don't think we ought to make a change of that scale for 0.13. A thread stopped in a 'finally' clause run during sorts has been reliable-enough for a long time. It's not ideal, but it is workable. A bug was recently introduced, where the scope of the thread grew too large. We should fix that bug for 0.13 and address the larger issue of improved progress reporting during sort for 0.14.

        Show
        Doug Cutting added a comment - Devaraj, your solution sounds good for 0.14, but I don't think we ought to make a change of that scale for 0.13. A thread stopped in a 'finally' clause run during sorts has been reliable-enough for a long time. It's not ideal, but it is workable. A bug was recently introduced, where the scope of the thread grew too large. We should fix that bug for 0.13 and address the larger issue of improved progress reporting during sort for 0.14.
        Hide
        Owen O'Malley added a comment -

        Either starting a new thread or causing the same one to start and stop seems like the best fix for 13.

        Show
        Owen O'Malley added a comment - Either starting a new thread or causing the same one to start and stop seems like the best fix for 13.
        Hide
        Devaraj Das added a comment -

        Doug, agree with you that this issue should be handled more generally as part of HADOOP-1201 scheduled for 0.14. That's why i put a comment (the first comment on this issue) to that effect when Owen raised the bug. I believe that the sort progress reporting as is done today has been working fine for quite some time (many months actually), and I can't remember what bug got introduced there (sorry). The only reason why sort could get stuck is for reason of bad user code in the Comparator and I am not convinced that we would have handled that issue completely without handling the merge cases also.
        On a side note, one problem that exists today is that the child Map/Reduce processes sometimes (rarely on linux), for some reason, doesn't exit even after the map/reduce method invocations are over (TaskRunner.run() doesn't exit, and hence tracker.reportTaskFinished(t.getTaskId()) is not called and finally the TaskTracker kills it after the timeout interval in the method markUnresponsiveTasks).
        But again, I am happy if we agree that we should look at this issue in more detail for 0.14 smile

        Show
        Devaraj Das added a comment - Doug, agree with you that this issue should be handled more generally as part of HADOOP-1201 scheduled for 0.14. That's why i put a comment (the first comment on this issue) to that effect when Owen raised the bug. I believe that the sort progress reporting as is done today has been working fine for quite some time (many months actually), and I can't remember what bug got introduced there (sorry). The only reason why sort could get stuck is for reason of bad user code in the Comparator and I am not convinced that we would have handled that issue completely without handling the merge cases also. On a side note, one problem that exists today is that the child Map/Reduce processes sometimes (rarely on linux), for some reason, doesn't exit even after the map/reduce method invocations are over (TaskRunner.run() doesn't exit, and hence tracker.reportTaskFinished(t.getTaskId()) is not called and finally the TaskTracker kills it after the timeout interval in the method markUnresponsiveTasks). But again, I am happy if we agree that we should look at this issue in more detail for 0.14 smile
        Hide
        Vivek Ratan added a comment -

        As part of a good solution (for 0.14 or later), I think we should separate out reporting of progress by the sort/merge/user code and reporting progress from the Task to the Task Tracker.

        For the former, we make the Reporter object available to the MapReduce kernel code, as Devaraj suggested, and at other appropriate places as discussed in this conversation. Wherever progress is made that we need to report (during sort or merge or whatever), the kernel code or the user's code calls the Reporter project.

        Separately, for the latter, we probably should continue with the Progress thread. This thread looks at the Progress data structures and sends progress info to the TaskTracker via RPC. To avoid the problem that this bug was filed for, we have two likely options:
        1. The thread continuus doing what it is doing is: it sends the progress information at regular intervals and the TaskTracker decides whether the task has really made progress, based on what it got earlier. Or
        2. The thread decides whether progress has really been made and makes an RPC call only if necessary. Even if progress is not made, it may make a call if we eliminate the Ping thread (see issue 1201) to prevent the TaskTracker from killing the task.

        The latter's probably a better option as the logic to decide whether progress has been made may be easier to implement in the thread, rather than in TaskTracker. As discussed earlier in this conversation, we may resume/suspend the thread, or at least make sure we start and stop it at the right places But I'd suggest we separate the issue of reporting progress locally (via the Reporter object) with reporting progress to the TaskTracker (via a thread). The logic for the two issues is diferent and separating the code will make things cleaner and easier to change.

        Show
        Vivek Ratan added a comment - As part of a good solution (for 0.14 or later), I think we should separate out reporting of progress by the sort/merge/user code and reporting progress from the Task to the Task Tracker. For the former, we make the Reporter object available to the MapReduce kernel code, as Devaraj suggested, and at other appropriate places as discussed in this conversation. Wherever progress is made that we need to report (during sort or merge or whatever), the kernel code or the user's code calls the Reporter project. Separately, for the latter, we probably should continue with the Progress thread. This thread looks at the Progress data structures and sends progress info to the TaskTracker via RPC. To avoid the problem that this bug was filed for, we have two likely options: 1. The thread continuus doing what it is doing is: it sends the progress information at regular intervals and the TaskTracker decides whether the task has really made progress, based on what it got earlier. Or 2. The thread decides whether progress has really been made and makes an RPC call only if necessary. Even if progress is not made, it may make a call if we eliminate the Ping thread (see issue 1201) to prevent the TaskTracker from killing the task. The latter's probably a better option as the logic to decide whether progress has been made may be easier to implement in the thread, rather than in TaskTracker. As discussed earlier in this conversation, we may resume/suspend the thread, or at least make sure we start and stop it at the right places But I'd suggest we separate the issue of reporting progress locally (via the Reporter object) with reporting progress to the TaskTracker (via a thread). The logic for the two issues is diferent and separating the code will make things cleaner and easier to change.
        Hide
        Owen O'Malley added a comment -

        I feel that this one is important for 0.13.

        Show
        Owen O'Malley added a comment - I feel that this one is important for 0.13.
        Hide
        Devaraj Das added a comment -

        Vivek, I think it would be useful to open a new issue with the thoughts you put as comment on this issue (and maybe the other related issues could also be clubbed into this one issue; there are 2-3 related issues). I think this particular issue is going to be closed with the quick fix that Arun is going to submit and hence no point discussing new ideas on this page.

        Show
        Devaraj Das added a comment - Vivek, I think it would be useful to open a new issue with the thoughts you put as comment on this issue (and maybe the other related issues could also be clubbed into this one issue; there are 2-3 related issues). I think this particular issue is going to be closed with the quick fix that Arun is going to submit and hence no point discussing new ideas on this page.
        Hide
        Arun C Murthy added a comment -

        Here is the short-term fix as discussed: send updates only when sort/merge is on, not all the time...

        Show
        Arun C Murthy added a comment - Here is the short-term fix as discussed: send updates only when sort/merge is on, not all the time...
        Show
        Hadoop QA added a comment - +1 http://issues.apache.org/jira/secure/attachment/12358481/HADOOP-1431_2_20070530.patch applied and successfully tested against trunk revision r542595. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/213/testReport/ Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/213/console
        Hide
        Owen O'Malley added a comment -

        +1

        Show
        Owen O'Malley added a comment - +1
        Hide
        Doug Cutting added a comment -

        Sigh. I wish this just started a new thread around each call to sortAndSpill, as I suggetested above, something like:

        try

        { Thread progress = createProgressThread(umbilical); sortAndSpill(); }

        finally

        { progress.interrupt(); }

        As it stands, the call to stop the thread is in a finally, but after other things that could throw exceptions, so there's no guarantee that the thread will actually exit. And the calls to pause the thread are not in a finally at all, so, if there's an exception in sorting, progress will not stop. Reusing a thread seems like a premature optimization that opens up lots of possible error modes that we don't need. I think rather we should simply narrow the scope of the prior logic. Threads are plenty cheap for this and I don't see the optimization is worth either the risks it adds nor the increased code to maintain.

        Show
        Doug Cutting added a comment - Sigh. I wish this just started a new thread around each call to sortAndSpill, as I suggetested above, something like: try { Thread progress = createProgressThread(umbilical); sortAndSpill(); } finally { progress.interrupt(); } As it stands, the call to stop the thread is in a finally, but after other things that could throw exceptions, so there's no guarantee that the thread will actually exit. And the calls to pause the thread are not in a finally at all, so, if there's an exception in sorting, progress will not stop. Reusing a thread seems like a premature optimization that opens up lots of possible error modes that we don't need. I think rather we should simply narrow the scope of the prior logic. Threads are plenty cheap for this and I don't see the optimization is worth either the risks it adds nor the increased code to maintain.
        Hide
        Arun C Murthy added a comment -

        Doug - point taken about the code complexity, patched anew to use new threads everytime...

        Show
        Arun C Murthy added a comment - Doug - point taken about the code complexity, patched anew to use new threads everytime...
        Hide
        Arun C Murthy added a comment -

        Re-submitting to the patch-testing system...

        Show
        Arun C Murthy added a comment - Re-submitting to the patch-testing system...
        Show
        Hadoop QA added a comment - +1 http://issues.apache.org/jira/secure/attachment/12358717/HADOOP-1431_3_20070601.patch applied and successfully tested against trunk revision r543222. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/226/testReport/ Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/226/console
        Hide
        Raghu Angadi added a comment -

        Doug's comment that was posted to HADOOP-1134 by mistake:

        Calvin Yu noted on hadoop-user that join() seems to sometimes hang even if the thread has been interrupted. In other places we use the idiom of a 'running' flag that's checked in a thread's loop in conjunction with an interrupt, rather than interrupt+join, and that seems to be reliable. So I think we should switch to that here to.

        Also, in the current patch, I don't see why the thread is held in a field. I worry that someone might add code like 'if (sortProgressThread == null) ...', and that we might somehow not always null this field. If it is kept in a local variable around the call then this is much less of a risk.

        So I think we should convert the createProgressThread method to a nested class whose constructor starts the thread and which has a stop() method that sets a flag. It would also be good if the 'try' block could be shared between 'collect()' and 'flush()'. I think this calls for a new method something like:

        private void sortWithProgress() {
        ProgressThread progress = new ProgressThread();
        try

        Unknown macro: { sortAndSpillToDisk(); }

        finally

        Unknown macro: { progress.stop(); }

        }

        Show
        Raghu Angadi added a comment - Doug's comment that was posted to HADOOP-1134 by mistake: Calvin Yu noted on hadoop-user that join() seems to sometimes hang even if the thread has been interrupted. In other places we use the idiom of a 'running' flag that's checked in a thread's loop in conjunction with an interrupt, rather than interrupt+join, and that seems to be reliable. So I think we should switch to that here to. Also, in the current patch, I don't see why the thread is held in a field. I worry that someone might add code like 'if (sortProgressThread == null) ...', and that we might somehow not always null this field. If it is kept in a local variable around the call then this is much less of a risk. So I think we should convert the createProgressThread method to a nested class whose constructor starts the thread and which has a stop() method that sets a flag. It would also be good if the 'try' block could be shared between 'collect()' and 'flush()'. I think this calls for a new method something like: private void sortWithProgress() { ProgressThread progress = new ProgressThread(); try Unknown macro: { sortAndSpillToDisk(); } finally Unknown macro: { progress.stop(); } }
        Hide
        Owen O'Malley added a comment -

        Interrupted is just like a standard running flag. The only problem with it is that the old Hadoop code has a bad habit of clearing the flag.

        I'm not sure what is going on with Calvin's hang, but fundamentally interrupt is just a more powerful form of the running flag. I would not go through and remove the use of interrupt. Removing the join is possible, since we don't really care if the thread is done done.

        Show
        Owen O'Malley added a comment - Interrupted is just like a standard running flag. The only problem with it is that the old Hadoop code has a bad habit of clearing the flag. I'm not sure what is going on with Calvin's hang, but fundamentally interrupt is just a more powerful form of the running flag. I would not go through and remove the use of interrupt. Removing the join is possible, since we don't really care if the thread is done done.
        Hide
        Doug Cutting added a comment -

        So, to be conservative, let's get rid of the join and just go with interrupt. Thus, the method should look something like:

        private void sortWithProgress() {
           Thread progress = createProgressThread(umbilical);
           try {
             sortAndSpillToDisk();
           } finally {
              progress.interrupt();
           }
        }
        

        We need not define a nested class, we can re-use the existing createProgressThread() method without alteration. Does that sound good?

        Show
        Doug Cutting added a comment - So, to be conservative, let's get rid of the join and just go with interrupt. Thus, the method should look something like: private void sortWithProgress() { Thread progress = createProgressThread(umbilical); try { sortAndSpillToDisk(); } finally { progress.interrupt(); } } We need not define a nested class, we can re-use the existing createProgressThread() method without alteration. Does that sound good?
        Hide
        Owen O'Malley added a comment -

        +1

        Show
        Owen O'Malley added a comment - +1
        Hide
        Devaraj Das added a comment -

        Submitting a patch on behalf of Arun (who is right now watching Pirates of The Carribean Part 3 smile). This patch has all the comments incorporated. Haven't had the chance to test it on a multi-node cluster.

        Show
        Devaraj Das added a comment - Submitting a patch on behalf of Arun (who is right now watching Pirates of The Carribean Part 3 smile ). This patch has all the comments incorporated. Haven't had the chance to test it on a multi-node cluster.
        Hide
        Devaraj Das added a comment -

        Did the Open->PA to have the patch run under the automated build process.

        Show
        Devaraj Das added a comment - Did the Open->PA to have the patch run under the automated build process.
        Hide
        Doug Cutting added a comment -

        I just committed this. Thanks Arun & Devaraj!

        Show
        Doug Cutting added a comment - I just committed this. Thanks Arun & Devaraj!
        Hide
        Hadoop QA added a comment -

        -1, could not apply patch.

        The patch command could not apply the latest attachment http://issues.apache.org/jira/secure/attachment/12358738/1431.patch as a patch to trunk revision r543607.

        Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/230/console

        Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.

        Show
        Hadoop QA added a comment - -1, could not apply patch. The patch command could not apply the latest attachment http://issues.apache.org/jira/secure/attachment/12358738/1431.patch as a patch to trunk revision r543607. Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/230/console Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.
        Hide
        Hadoop QA added a comment -
        Show
        Hadoop QA added a comment - Integrated in Hadoop-Nightly #108 (See http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Nightly/108/ )

          People

          • Assignee:
            Arun C Murthy
            Reporter:
            Owen O'Malley
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development