Hadoop Common
  1. Hadoop Common
  2. HADOOP-5572

The map progress value should have a separate phase for doing the final sort.

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.21.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Currently, the final spill and sort doesn't record any progress while it runs, leading to the perception that the map is done, but "stuck".

      This patch reserves 33.3% of map task's progress to sort by dividing map task in to 2 phases. Also makes the progress of sort phase(merges) work, both in map side and reduce side.

      1. HADOOP-5572.v1.patch
        35 kB
        Ravi Gummadi
      2. HADOOP-5572.v1.4.patch
        41 kB
        Ravi Gummadi
      3. HADOOP-5572.v1.3.patch
        39 kB
        Ravi Gummadi
      4. HADOOP-5572.v1.2.patch
        38 kB
        Ravi Gummadi
      5. HADOOP-5572.v1.1.patch
        36 kB
        Ravi Gummadi
      6. HADOOP-5572.patch
        33 kB
        Ravi Gummadi

        Issue Links

          Activity

          Hide
          Owen O'Malley added a comment -

          I think we should reserve 33% of the map task's progress range for the final sort. It is probably more than necessary, but is consistent with the reduce's sort.

          Show
          Owen O'Malley added a comment - I think we should reserve 33% of the map task's progress range for the final sort. It is probably more than necessary, but is consistent with the reduce's sort.
          Hide
          Arun C Murthy added a comment -

          +1

          Show
          Arun C Murthy added a comment - +1
          Hide
          Ravi Gummadi added a comment -

          We are planning to allocate 33% of map task's progress to final sort.

          Since merge progress is not updated currently(both map side and reduce side), even if we allocate 33% of mapTask progress to sort(merge), map progress will be stuck at 66.7% till sort(merge) is finished and progress will jump from 66.7% to 100%. This could affect speculative execution.

          Here is a proposal for updating sort/merge progress approximately.

          In merge(), we consider the smallest io.sort.factor files for each merge. So we assume that there is no combiner and we calculate the denominator for mergeProgress using the following before the begining of merges:

          We maintain a list of sizes of segments to be merged(sorted list). We add the sizes of smallest factor segments(that are going be merged first) and add the sum to the list and remove the smallest factor sizes. Do this again and again until we are left with 1 element in the list. This element is the denominator for mergeProgress for 1st merge.
          As and when the segments are read for a merge, the numerator is incremented based on position in the segment and mergeProgress is updated.
          Denominator is decreased by the difference (inputRecordsForThisMerge - mergedRecordsInThisMerge). This is to get better approximation of mergeProgress with combiner being called in merges.

          mergeProgress is not very accurate(when combiner is used in merges) in the above approach because of 2 reasons:
          (1) Exact estimation of total size of data(going to be merged in all the merges) before merges is not possible when combiner is there.
          (2) sizes of compressed and uncompressed segments(inMemory segments) are treated alike.

          This would also avoid jump of reduce task progress from 33.3% to 66.7%. On reduce side, for mergeProgress, we will have to avoid adding the sizes of segments of last merge of factor segments in estimating the total size of data that will be merged(computation of denominator from the list of sizes of segments), because the last merge is considered as part of the 3rd phase of reduce task(i.e. reduce phase).

          Thoughts ?

          Show
          Ravi Gummadi added a comment - We are planning to allocate 33% of map task's progress to final sort. Since merge progress is not updated currently(both map side and reduce side), even if we allocate 33% of mapTask progress to sort(merge), map progress will be stuck at 66.7% till sort(merge) is finished and progress will jump from 66.7% to 100%. This could affect speculative execution. Here is a proposal for updating sort/merge progress approximately. In merge(), we consider the smallest io.sort.factor files for each merge. So we assume that there is no combiner and we calculate the denominator for mergeProgress using the following before the begining of merges: We maintain a list of sizes of segments to be merged(sorted list). We add the sizes of smallest factor segments(that are going be merged first) and add the sum to the list and remove the smallest factor sizes. Do this again and again until we are left with 1 element in the list. This element is the denominator for mergeProgress for 1st merge. As and when the segments are read for a merge, the numerator is incremented based on position in the segment and mergeProgress is updated. Denominator is decreased by the difference (inputRecordsForThisMerge - mergedRecordsInThisMerge). This is to get better approximation of mergeProgress with combiner being called in merges. mergeProgress is not very accurate(when combiner is used in merges) in the above approach because of 2 reasons: (1) Exact estimation of total size of data(going to be merged in all the merges) before merges is not possible when combiner is there. (2) sizes of compressed and uncompressed segments(inMemory segments) are treated alike. This would also avoid jump of reduce task progress from 33.3% to 66.7%. On reduce side, for mergeProgress, we will have to avoid adding the sizes of segments of last merge of factor segments in estimating the total size of data that will be merged(computation of denominator from the list of sizes of segments), because the last merge is considered as part of the 3rd phase of reduce task(i.e. reduce phase). Thoughts ?
          Hide
          Ravi Gummadi added a comment -

          For example, if there are 6 segments(of lengths 10, 20, 30, 40, 50, 200) as input to merge() in map task, then totalBytesProcessed is incremented as and when the position in any segment is updated.

          totalBytes = (10+20) // 1st merge
          + (30+30+40) // 2nd merge
          + (100+50+200) // 3rd merge
          = 480 // denominator in computation of mergeProgress during 1st merge

          After 1st merge, mergeProgress = totalBytesProcessed/totalBytes = (10+20)/480;
          Let us say the length of the merged segment(of 1st merge with i/p sizes 10, 20) is 25 because of combiner.
          totalBytes = 480 - (30-25) = 475; // denominator in computation of mergeProgress during 2nd merge

          After 2nd merge, mergeProgress = (30+(25+30+40))/475;
          Let us say the length of the merged segment(of 2nd merge with i/p sizes 25, 30, 40) is 85 because of combiner.
          totalBytes = 475 - (25+30+40 - 85) = 465; // denominator in computation of mergeProgress during 3rd merge

          After 3rd merge, mergeProgress = (30+85+(100+50+200))/465=1.0;

          Show
          Ravi Gummadi added a comment - For example, if there are 6 segments(of lengths 10, 20, 30, 40, 50, 200) as input to merge() in map task, then totalBytesProcessed is incremented as and when the position in any segment is updated. totalBytes = (10+20) // 1st merge + (30+30+40) // 2nd merge + (100+50+200) // 3rd merge = 480 // denominator in computation of mergeProgress during 1st merge After 1st merge, mergeProgress = totalBytesProcessed/totalBytes = (10+20)/480; Let us say the length of the merged segment(of 1st merge with i/p sizes 10, 20) is 25 because of combiner. totalBytes = 480 - (30-25) = 475; // denominator in computation of mergeProgress during 2nd merge After 2nd merge, mergeProgress = (30+(25+30+40))/475; Let us say the length of the merged segment(of 2nd merge with i/p sizes 25, 30, 40) is 85 because of combiner. totalBytes = 475 - (25+30+40 - 85) = 465; // denominator in computation of mergeProgress during 3rd merge After 3rd merge, mergeProgress = (30+85+(100+50+200))/465=1.0;
          Hide
          Owen O'Malley added a comment -

          The simple change would help speculative execution, since it is a closer approximation of the actual progress. Currently, speculative on maps often doesn't happen when it should because the map is stuck at 100%.

          I don't think we should be tracking the progress as each record is consumed in the merge sort. That will be too much overhead, unless you are very careful. If you want to weight the progress by how many bytes are in each segment, that would provide a better match. Do make sure that you consider how combiners will affect the progress meters.

          Show
          Owen O'Malley added a comment - The simple change would help speculative execution, since it is a closer approximation of the actual progress. Currently, speculative on maps often doesn't happen when it should because the map is stuck at 100%. I don't think we should be tracking the progress as each record is consumed in the merge sort. That will be too much overhead, unless you are very careful. If you want to weight the progress by how many bytes are in each segment, that would provide a better match. Do make sure that you consider how combiners will affect the progress meters.
          Hide
          Ravi Gummadi added a comment -

          This patch reserves 33.3% of map task to sort. So map task now has 2 phases: map phase and sort phase. Progress can have different weightages for different phases now. In map task, map phase accounts for 66.7% and sort phase 33.3%. This patch also makes code changes needed for updating of merge progress both on map side and reduce side.

          Testing is in progress.

          Please review and provide your comments.

          Show
          Ravi Gummadi added a comment - This patch reserves 33.3% of map task to sort. So map task now has 2 phases: map phase and sort phase. Progress can have different weightages for different phases now. In map task, map phase accounts for 66.7% and sort phase 33.3%. This patch also makes code changes needed for updating of merge progress both on map side and reduce side. Testing is in progress. Please review and provide your comments.
          Hide
          Jothi Padmanabhan added a comment -

          Some initial comments:

          1. Ensure that the sum of weights for a phase does not cross 1
          2. Having a boolean variable to keep track of whether the weights are fixed or variable is a better option
          3. Merger – Sort the segments only if numSegments > factor
          4. Relying on writesCounter to decide includeFinalMerge variable is not a good idea.
          5. computeBytesInMerges should disregard empty segments – we probably need to add a isEmpty() API to Segment.
          Show
          Jothi Padmanabhan added a comment - Some initial comments: Ensure that the sum of weights for a phase does not cross 1 Having a boolean variable to keep track of whether the weights are fixed or variable is a better option Merger – Sort the segments only if numSegments > factor Relying on writesCounter to decide includeFinalMerge variable is not a good idea. computeBytesInMerges should disregard empty segments – we probably need to add a isEmpty() API to Segment.
          Hide
          Ravi Gummadi added a comment -

          Incorporated Jothi's 1st 3 comments.
          Discussed with Jothi offline regarding comments 4 & 5. For comment 4, there seems to be no cleaner way, so keeping it that way. Regarding comment 5, it seems checking for empty segments(by reading segments) before actual merges seem to be costly in terms of performance. So not handling empty segments separately in our estimation assuming that it wouldn't hurt much in the approximation of mergeProgress.

          Fixed an issue in informReduceProgress() by changing the call from Progress.get() to Progress.getInternal() because we need progress for this phase/node only(and not for the whole tree). Made Progress.getInternal() public.

          Attaching patch with the above changes. Please review and provide your comments.

          Show
          Ravi Gummadi added a comment - Incorporated Jothi's 1st 3 comments. Discussed with Jothi offline regarding comments 4 & 5. For comment 4, there seems to be no cleaner way, so keeping it that way. Regarding comment 5, it seems checking for empty segments(by reading segments) before actual merges seem to be costly in terms of performance. So not handling empty segments separately in our estimation assuming that it wouldn't hurt much in the approximation of mergeProgress. Fixed an issue in informReduceProgress() by changing the call from Progress.get() to Progress.getInternal() because we need progress for this phase/node only(and not for the whole tree). Made Progress.getInternal() public. Attaching patch with the above changes. Please review and provide your comments.
          Hide
          Ravi Gummadi added a comment -

          Earlier patch doesn't have the code change for boolean variable to keep track of whether the weights are fixed or variable.
          Attaching a new patch with that change.
          Please review and provide your comments.

          Show
          Ravi Gummadi added a comment - Earlier patch doesn't have the code change for boolean variable to keep track of whether the weights are fixed or variable. Attaching a new patch with that change. Please review and provide your comments.
          Hide
          Jothi Padmanabhan added a comment -

          Looks good. A few more points:

          1. Create a new method getProgressForCurrentNode() and keep getInternal() private
          2. addPhases(int) should call existing addPhase method instead of duplicating code
          3. Check for (mapFinishTime.length() > 0) is not required in JT
          4. MapTask: isMapOrReduceTask can use isMapTask()
          5. Check if progress is being updated correctly and works fine with new Reducer API
          6. Merger: Remove Collections.sort() in the beginning
          7. Can we do better than relying on writesCounter to determine if the final merge needs to be included in the calculation or not?
          Show
          Jothi Padmanabhan added a comment - Looks good. A few more points: Create a new method getProgressForCurrentNode() and keep getInternal() private addPhases(int) should call existing addPhase method instead of duplicating code Check for (mapFinishTime.length() > 0) is not required in JT MapTask: isMapOrReduceTask can use isMapTask() Check if progress is being updated correctly and works fine with new Reducer API Merger: Remove Collections.sort() in the beginning Can we do better than relying on writesCounter to determine if the final merge needs to be included in the calculation or not?
          Hide
          Ravi Gummadi added a comment -

          Made code changes as per Jothi's 1st 4 comments.

          1. Check if progress is being updated correctly and works fine with new Reducer API

          As progress is not updated with new Reducer api while records are being fed to reducer, reduce task progress is not updated from 66.66% and jumps to 100% when the task is done. May be we need to file a separate JIRA for the new API to have the updation of progress similar to old api.

          1. Merger: Remove Collections.sort() in the beginning

          OK. Removed sort() in the begining of merge() and changed the code in the callers to get sorted segments to merge() if there are more than ioSortFactor segments.
          Changed mergeParts() to call merge() with sorted segments if there are more than ioSortFactor segments. Earlier, mergeParts() was sending unsorted segments to merge() and after first intermediate merge only, segments are sorted — so 1st merge is not merging the smallest segments.
          Removed sort() call after each intermediate merge and 'insertion into sorted segments list' is done. This could improve performance as calling sort with complexity O(n.logn) after each intermediate merge is costly.

          1. Can we do better than relying on writesCounter to determine if the final merge needs to be included in the calculation or not?

          I couldn't see a cleaner/better way of doing this.

          Attaching patch with the above changes. Please review and provide your comments.

          Show
          Ravi Gummadi added a comment - Made code changes as per Jothi's 1st 4 comments. Check if progress is being updated correctly and works fine with new Reducer API As progress is not updated with new Reducer api while records are being fed to reducer, reduce task progress is not updated from 66.66% and jumps to 100% when the task is done. May be we need to file a separate JIRA for the new API to have the updation of progress similar to old api. Merger: Remove Collections.sort() in the beginning OK. Removed sort() in the begining of merge() and changed the code in the callers to get sorted segments to merge() if there are more than ioSortFactor segments. Changed mergeParts() to call merge() with sorted segments if there are more than ioSortFactor segments. Earlier, mergeParts() was sending unsorted segments to merge() and after first intermediate merge only, segments are sorted — so 1st merge is not merging the smallest segments. Removed sort() call after each intermediate merge and 'insertion into sorted segments list' is done. This could improve performance as calling sort with complexity O(n.logn) after each intermediate merge is costly. Can we do better than relying on writesCounter to determine if the final merge needs to be included in the calculation or not? I couldn't see a cleaner/better way of doing this. Attaching patch with the above changes. Please review and provide your comments.
          Hide
          Ravi Gummadi added a comment -

          Earlier, missed backward compatibility of public method JobHistory.MapAttempt.logFinished() because of adding a new param.

          Attaching new patch with the change. Please review and provide your comments.

          Show
          Ravi Gummadi added a comment - Earlier, missed backward compatibility of public method JobHistory.MapAttempt.logFinished() because of adding a new param. Attaching new patch with the change. Please review and provide your comments.
          Hide
          Ravi Gummadi added a comment -

          In Merger, removed reliance on writesCounter to determine if the final merge needs to be included as part of sort phase in the calculation by adding a new static method for setting this boolean variable includeFinalMerge.

          Also made code changes such that web UI now shows "Map Phase Finished" time for map tasks.

          Please review and provide your comments.

          Show
          Ravi Gummadi added a comment - In Merger, removed reliance on writesCounter to determine if the final merge needs to be included as part of sort phase in the calculation by adding a new static method for setting this boolean variable includeFinalMerge. Also made code changes such that web UI now shows "Map Phase Finished" time for map tasks. Please review and provide your comments.
          Hide
          Ravi Gummadi added a comment -

          ant test-patch gave:

          [exec] -1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] +1 tests included. The patch appears to include 6 new or modified tests.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
          [exec]
          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
          [exec]
          [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
          [exec]
          [exec] -1 release audit. The applied patch generated 492 release audit warnings (more than the trunk's current 489 warnings).

          Unit tests passed on my local machine.

          Show
          Ravi Gummadi added a comment - ant test-patch gave: [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 6 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. [exec] [exec] -1 release audit. The applied patch generated 492 release audit warnings (more than the trunk's current 489 warnings). Unit tests passed on my local machine.
          Hide
          Devaraj Das added a comment -

          I just committed this. Thanks, Ravi!

          Show
          Devaraj Das added a comment - I just committed this. Thanks, Ravi!
          Hide
          Robert Chansler added a comment -

          Editorial pass over all release notes prior to publication of 0.21.

          Show
          Robert Chansler added a comment - Editorial pass over all release notes prior to publication of 0.21.

            People

            • Assignee:
              Ravi Gummadi
              Reporter:
              Owen O'Malley
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development