Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.21.0
    • Component/s: performance, task
    • Labels:
      None
    • Release Note:
      Hide
      Refactors shuffle code out of ReduceTask into separate classes in a new package(org.apache.hadoop.mapreduce.task.reduce)
      Incorporates MAPREDUCE-240, batches up several map output files from a TT to a reducer in a single transfer
      Introduces new Shuffle counters to keep track of shuffle errors
      Show
      Refactors shuffle code out of ReduceTask into separate classes in a new package(org.apache.hadoop.mapreduce.task.reduce) Incorporates MAPREDUCE-240 , batches up several map output files from a TT to a reducer in a single transfer Introduces new Shuffle counters to keep track of shuffle errors

      Description

      The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.

      1. mapred-318-3Sep-v1.patch
        243 kB
        Jothi Padmanabhan
      2. mapred-318-3Sep.patch
        242 kB
        Jothi Padmanabhan
      3. mapred-318-24Aug.patch
        239 kB
        Jothi Padmanabhan
      4. mapred-318-20Aug.patch
        234 kB
        Jothi Padmanabhan
      5. mapred-318-common.patch
        3 kB
        Jothi Padmanabhan
      6. mapred-318-14Aug.patch
        233 kB
        Jothi Padmanabhan
      7. HADOOP-5233_part0.patch
        21 kB
        Arun C Murthy
      8. HADOOP-5233_api.patch
        12 kB
        Arun C Murthy

        Issue Links

          Activity

          Hide
          Leitao Guo added a comment -

          does the patch include the description of MAPREDUCE-268?

          Show
          Leitao Guo added a comment - does the patch include the description of MAPREDUCE-268 ?
          Hide
          Mariappan Asokan added a comment -

          Hi Todd,
          Thanks for your comments. I read more positive comments on the performance improvement that other developers have noticed. I understand your concerns on the stability of the code as well. However, I see the following drawbacks keeping the existing code:

          • We have to add another configuration parameter to force the execution of new code.
          • There will be some code duplication. Bug fixes have to go into both code bases until of course 1.x release is obsoleted.
          • If there is any enhancement to be done, one has to incorporate it into both code bases. Just like MAPREDUCE-2454, there is another Jira MAPREDUCE-4049 that requires two different patches one for each code base.

          Having said this, I am okay to keep the existing code and bypass it only when a configuration parameter is set. I want to know whether there is any other show-stopper along the way. If there is none, I will raise a Jira on this and start working towards it.

          Show
          Mariappan Asokan added a comment - Hi Todd, Thanks for your comments. I read more positive comments on the performance improvement that other developers have noticed. I understand your concerns on the stability of the code as well. However, I see the following drawbacks keeping the existing code: We have to add another configuration parameter to force the execution of new code. There will be some code duplication. Bug fixes have to go into both code bases until of course 1.x release is obsoleted. If there is any enhancement to be done, one has to incorporate it into both code bases. Just like MAPREDUCE-2454 , there is another Jira MAPREDUCE-4049 that requires two different patches one for each code base. Having said this, I am okay to keep the existing code and bypass it only when a configuration parameter is set. I want to know whether there is any other show-stopper along the way. If there is none, I will raise a Jira on this and start working towards it.
          Hide
          Todd Lipcon added a comment -

          I'm +1 on providing the new shuffle as an option in branch-1, if it provides better performance. But I think we should maintain the old code as well, in case there are any unforeseen issues with the new codebase. We don't want to destabilize the stable MR task which has had lots of tweaks over the years for various job profiles and rare error cases.

          Show
          Todd Lipcon added a comment - I'm +1 on providing the new shuffle as an option in branch-1, if it provides better performance. But I think we should maintain the old code as well, in case there are any unforeseen issues with the new codebase. We don't want to destabilize the stable MR task which has had lots of tweaks over the years for various job profiles and rare error cases.
          Hide
          Mariappan Asokan added a comment -

          Will there be any issues if I try to port this fix to Hadoop 1.1.x branch? I want to do this since it would make back-porting MAPREDUCE-2454 to 1.1.x easier. Please voice your opinions and suggestions before I venture on this.
          Thanks.

          Show
          Mariappan Asokan added a comment - Will there be any issues if I try to port this fix to Hadoop 1.1.x branch? I want to do this since it would make back-porting MAPREDUCE-2454 to 1.1.x easier. Please voice your opinions and suggestions before I venture on this. Thanks.
          Todd Lipcon made changes -
          Link This issue incorporates MAPREDUCE-268 [ MAPREDUCE-268 ]
          Todd Lipcon made changes -
          Component/s performance [ 12316500 ]
          Component/s task [ 12312920 ]
          Hide
          Harsh J added a comment -

          A question about the MAX_MAPS_AT_ONCE limit chosen to 20. Would anyone have an answer to this?

          On Sun, Jun 19, 2011 at 4:01 AM, Shrinivas Joshi <jshrinivasatttttgmaildottttttcom> wrote:
          > We see following type of lines in our reducer log files. Based on my
          > understanding it looks like the target map host has 53 map outputs that are
          > ready to be fetched. The shuffle scheduler seems to be allowing only 20 of
          > them to be fetched at a time. This is controlled by MAX_MAPS_AT_ONCE
          > variable in ShuffleScheduler class. Is my understanding of this log output
          > correct? If so, why is MAX_MAPS_AT_ONCE set to 20?
          >
          > Thanks for your time.
          >
          > -Shrinivas
          >
          > INFO org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler:
          > Assiging hostname:50060 with 53 to fetcher#16
          > INFO org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler:
          > assigned 20 of 53 to hostname:50060 to fetcher#16

          Show
          Harsh J added a comment - A question about the MAX_MAPS_AT_ONCE limit chosen to 20. Would anyone have an answer to this? On Sun, Jun 19, 2011 at 4:01 AM, Shrinivas Joshi <jshrinivasatttttgmaildottttttcom> wrote: > We see following type of lines in our reducer log files. Based on my > understanding it looks like the target map host has 53 map outputs that are > ready to be fetched. The shuffle scheduler seems to be allowing only 20 of > them to be fetched at a time. This is controlled by MAX_MAPS_AT_ONCE > variable in ShuffleScheduler class. Is my understanding of this log output > correct? If so, why is MAX_MAPS_AT_ONCE set to 20? > > Thanks for your time. > > -Shrinivas > > INFO org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: > Assiging hostname:50060 with 53 to fetcher#16 > INFO org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: > assigned 20 of 53 to hostname:50060 to fetcher#16
          Tom White made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Hide
          Scott Carey added a comment -

          In addition to a quick code review of the bits I was interested in related to fetching map output fragments, I did a quick and dirty test on trunk on a tiny cluster to make sure that this change had the same effect as the one-line fix I apply to 0.19.2 on production for similar benefits. See my comment from June 10 2009. The old code was artificially throttling the shuffle to one output file per TT per ping-cycle.

          Quite simply, any fix that lets a reducer fetch all the complete map outputs it finds in one ping-cycle helps those jobs with map output counts much greater than node count. One line hack or full refactor.

          The impact really depends on the cluster config and job type... ours is new hardware with plenty of RAM per node which leads to using ~11 + concurrent map tasks per node and a larger ratio of map shards per reduce to task trackers. The bigger that ratio, the bigger the impact of optimized shuffle fetching.

          Show
          Scott Carey added a comment - In addition to a quick code review of the bits I was interested in related to fetching map output fragments, I did a quick and dirty test on trunk on a tiny cluster to make sure that this change had the same effect as the one-line fix I apply to 0.19.2 on production for similar benefits. See my comment from June 10 2009. The old code was artificially throttling the shuffle to one output file per TT per ping-cycle. Quite simply, any fix that lets a reducer fetch all the complete map outputs it finds in one ping-cycle helps those jobs with map output counts much greater than node count. One line hack or full refactor. The impact really depends on the cluster config and job type... ours is new hardware with plenty of RAM per node which leads to using ~11 + concurrent map tasks per node and a larger ratio of map shards per reduce to task trackers. The bigger that ratio, the bigger the impact of optimized shuffle fetching.
          Hide
          Todd Lipcon added a comment -

          Scott: are you running this on the 20 branch or using 21/trunk? If you have a patch rebased onto 20 that you've been tested I'd be interested in taking a look.

          Show
          Todd Lipcon added a comment - Scott: are you running this on the 20 branch or using 21/trunk? If you have a patch rebased onto 20 that you've been tested I'd be interested in taking a look.
          Hide
          Arun C Murthy added a comment -

          Thanks for sharing that with us Scott... we aim to please! smile

          Show
          Arun C Murthy added a comment - Thanks for sharing that with us Scott... we aim to please! smile
          Hide
          Scott Carey added a comment -

          You may also want to note that this change improves performance significantly in some cases, especially when there is a large number of small to medium sized map outputs (many more outputs to fetch per reduce than the number of TaskTrackers).
          In some of my jobs, shuffle times have dropped from 60% of the job time to < 5%.

          For a given job, shuffle time is FAR less sensitive to the number of maps and reducers than it was before.

          Show
          Scott Carey added a comment - You may also want to note that this change improves performance significantly in some cases, especially when there is a large number of small to medium sized map outputs (many more outputs to fetch per reduce than the number of TaskTrackers). In some of my jobs, shuffle times have dropped from 60% of the job time to < 5%. For a given job, shuffle time is FAR less sensitive to the number of maps and reducers than it was before.
          Jothi Padmanabhan made changes -
          Release Note Refactors shuffle code out of ReduceTask into separate classes in a new package(org.apache.hadoop.mapreduce.task.reduce)
          Incorporates MAPREDUCE-240, batches up several map output files from a TT to a reducer in a single transfer
          Introduces new Shuffle counters to keep track of shuffle errors
          Hide
          Hudson added a comment -

          Integrated in Hadoop-Mapreduce-trunk #75 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk/75/)

          Show
          Hudson added a comment - Integrated in Hadoop-Mapreduce-trunk #75 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk/75/ )
          Hide
          Hudson added a comment -

          Integrated in Hadoop-Mapreduce-trunk-Commit #13 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk-Commit/13/)
          . Modularizes the shuffle code. Contributed by Jothi Padmanabhan and Arun Murthy.

          Show
          Hudson added a comment - Integrated in Hadoop-Mapreduce-trunk-Commit #13 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk-Commit/13/ ) . Modularizes the shuffle code. Contributed by Jothi Padmanabhan and Arun Murthy.
          Devaraj Das made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Fix Version/s 0.21.0 [ 12314045 ]
          Resolution Fixed [ 1 ]
          Hide
          Devaraj Das added a comment -

          I just committed this. Thanks, Jothi and Arun for the development work on this! Thanks, Iyappan for the rigorous testing!

          Show
          Devaraj Das added a comment - I just committed this. Thanks, Jothi and Arun for the development work on this! Thanks, Iyappan for the rigorous testing!
          Hide
          Iyappan Srinivasan added a comment -

          +1 for testing

          Cluster conf with mapred.child.java.opts 512M and io.sort.factor 100.
          namenode heap size is 3GB and jobtracker heap size is 1GB.

          Some benchmarking and functionality test results.

          1)
          default sort on a a 94 node cluster :
          trunk two attempts : 1)2376 seconds 2) 2589 seconds
          with patch last two attempts : 1) 1408 seconds 2) 1381 seconds

          2)
          loadgen ona 94 node cluster:
          trunk : 57 minutes
          with patch two attempts : 1)56 minuts 9 seconds 2) 56 minuts 23 seconds.

          3)
          gridmix2 on a 491 node cluster :
          trunk : 1 hour 7 minutes
          patch two attempts : 1) 57 minutes, 2) 47 minutes.

          4) sort ( with memory-to-memory enabled) : passed

          5) After starting the job with mapred.reduce.slowstart.completed.maps=1, remove some intermediate map output and corrupt some map output. verify if only those tasks are rerun.

          Show
          Iyappan Srinivasan added a comment - +1 for testing Cluster conf with mapred.child.java.opts 512M and io.sort.factor 100. namenode heap size is 3GB and jobtracker heap size is 1GB. Some benchmarking and functionality test results. 1) default sort on a a 94 node cluster : trunk two attempts : 1)2376 seconds 2) 2589 seconds with patch last two attempts : 1) 1408 seconds 2) 1381 seconds 2) loadgen ona 94 node cluster: trunk : 57 minutes with patch two attempts : 1)56 minuts 9 seconds 2) 56 minuts 23 seconds. 3) gridmix2 on a 491 node cluster : trunk : 1 hour 7 minutes patch two attempts : 1) 57 minutes, 2) 47 minutes. 4) sort ( with memory-to-memory enabled) : passed 5) After starting the job with mapred.reduce.slowstart.completed.maps=1, remove some intermediate map output and corrupt some map output. verify if only those tasks are rerun.
          Hide
          Jothi Padmanabhan added a comment -

          Timed out test, TestNodeRefresh, is a known issue (MAPREDUCE-943)

          Show
          Jothi Padmanabhan added a comment - Timed out test, TestNodeRefresh, is a known issue ( MAPREDUCE-943 )
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12418478/mapred-318-3Sep-v1.patch
          against trunk revision 810505.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 15 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          -1 core tests. The patch failed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12418478/mapred-318-3Sep-v1.patch against trunk revision 810505. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 15 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/36/console This message is automatically generated.
          Jothi Padmanabhan made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Jothi Padmanabhan made changes -
          Attachment mapred-318-3Sep-v1.patch [ 12418478 ]
          Hide
          Jothi Padmanabhan added a comment -

          Updated patch

          Show
          Jothi Padmanabhan added a comment - Updated patch
          Jothi Padmanabhan made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12418466/mapred-318-3Sep.patch
          against trunk revision 810505.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 12 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12418466/mapred-318-3Sep.patch against trunk revision 810505. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/35/console This message is automatically generated.
          Hide
          Jothi Padmanabhan added a comment -

          The patch looks fine, could you comment on the bug you saw with a deadlocked shuffle you saw once since I do see you've reverted back to 'original' patch wrt the fetcher threads and stalling?

          We have not seen the deadlock at all, it appears to be a one off issue. We had done some exhaustive testing and could not reproduce it at all.

          Show
          Jothi Padmanabhan added a comment - The patch looks fine, could you comment on the bug you saw with a deadlocked shuffle you saw once since I do see you've reverted back to 'original' patch wrt the fetcher threads and stalling? We have not seen the deadlock at all, it appears to be a one off issue. We had done some exhaustive testing and could not reproduce it at all.
          Jothi Padmanabhan made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Jothi Padmanabhan made changes -
          Attachment mapred-318-3Sep.patch [ 12418466 ]
          Hide
          Jothi Padmanabhan added a comment -

          Updated patch

          Show
          Jothi Padmanabhan added a comment - Updated patch
          Hide
          Hudson added a comment -

          Integrated in Hadoop-Common-trunk-Commit #8 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Common-trunk-Commit/8/)
          HADOOP-6226. Moves BoundedByteArrayOutputStream from the tfile package to the io package and makes it available to other users (). Contributed by Jothi Padmanabhan.

          Show
          Hudson added a comment - Integrated in Hadoop-Common-trunk-Commit #8 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Common-trunk-Commit/8/ ) HADOOP-6226 . Moves BoundedByteArrayOutputStream from the tfile package to the io package and makes it available to other users (). Contributed by Jothi Padmanabhan.
          Hide
          Hudson added a comment -

          Integrated in Hadoop-Common-trunk #80 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Common-trunk/80/)
          HADOOP-6224. Adds methods to read strings safely, makes the Buffer class in DataOutputBuffer public, and introduces public constructors there. These changes are required for . Contributed by Jothi Padmanabhan and Arun Murthy.

          Show
          Hudson added a comment - Integrated in Hadoop-Common-trunk #80 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Common-trunk/80/ ) HADOOP-6224 . Adds methods to read strings safely, makes the Buffer class in DataOutputBuffer public, and introduces public constructors there. These changes are required for . Contributed by Jothi Padmanabhan and Arun Murthy.
          Hide
          Hudson added a comment -

          Integrated in Hadoop-Common-trunk-Commit #3 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Common-trunk-Commit/3/)
          HADOOP-6224. Adds methods to read strings safely, makes the Buffer class in DataOutputBuffer public, and introduces public constructors there. These changes are required for . Contributed by Jothi Padmanabhan and Arun Murthy.

          Show
          Hudson added a comment - Integrated in Hadoop-Common-trunk-Commit #3 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Common-trunk-Commit/3/ ) HADOOP-6224 . Adds methods to read strings safely, makes the Buffer class in DataOutputBuffer public, and introduces public constructors there. These changes are required for . Contributed by Jothi Padmanabhan and Arun Murthy.
          Hide
          Arun C Murthy added a comment -

          Document rationale of why all fetch threads will never stall i.e. will allow one thread to go past always

          The patch looks fine, could you comment on the bug you saw with a deadlocked shuffle you saw once since I do see you've reverted back to 'original' patch wrt the fetcher threads and stalling?

          Show
          Arun C Murthy added a comment - Document rationale of why all fetch threads will never stall i.e. will allow one thread to go past always The patch looks fine, could you comment on the bug you saw with a deadlocked shuffle you saw once since I do see you've reverted back to 'original' patch wrt the fetcher threads and stalling?
          Jothi Padmanabhan made changes -
          Attachment mapred-318-24Aug.patch [ 12417475 ]
          Hide
          Jothi Padmanabhan added a comment -

          New patch with review comments incorporated.
          Also fixed some findbugs warnings.

          Show
          Jothi Padmanabhan added a comment - New patch with review comments incorporated. Also fixed some findbugs warnings.
          Hide
          Arun C Murthy added a comment -

          Oh, and some benchmark numbers would be nice to have. Thanks!

          Show
          Arun C Murthy added a comment - Oh, and some benchmark numbers would be nice to have. Thanks!
          Hide
          Arun C Murthy added a comment -

          It's fascinating to find creative ways to kick myself after a quarter-year... here goes:

          1. non-public apis being made public... javadoc warnings?
          2. Remove logSlowShuffle - that was only for debugging
          3. Cut down logging at INFO level
          4. Re-factor construction/parsing of shuffle request and document heavily.
          5. Create an object to represent the shuffle header and make it a Writable
          6. Fetcher
            • Rename STALLED_COPY_TIMEOUT to DEFAULT_STALLED_COPY_TIMEOUT
            • Pull sanity checks in copyMapOutput to a separate function and document
          7. MergeManager
            • Document rationale of why all fetch threads will never stall i.e. will allow one thread to go past always
            • Document rationale of having unconditionalReserve and reserve (mem-to-mem merger needs this)
          8. Testing
            • Test with exceptions (IOException, OOM etc.) from each of the threads: Fetcher, EventFetcher, ShuffleScheduler etc.

          smile I'm pretty sure I can't be blamed for:

            -      throws ClassNotFoundException, IOException, InterruptedException {
            +      throws Throwable {
          

          Please do not use this bad idiom.

          Show
          Arun C Murthy added a comment - It's fascinating to find creative ways to kick myself after a quarter-year... here goes: non-public apis being made public... javadoc warnings? Remove logSlowShuffle - that was only for debugging Cut down logging at INFO level Re-factor construction/parsing of shuffle request and document heavily. Create an object to represent the shuffle header and make it a Writable Fetcher Rename STALLED_COPY_TIMEOUT to DEFAULT_STALLED_COPY_TIMEOUT Pull sanity checks in copyMapOutput to a separate function and document MergeManager Document rationale of why all fetch threads will never stall i.e. will allow one thread to go past always Document rationale of having unconditionalReserve and reserve (mem-to-mem merger needs this) Testing Test with exceptions (IOException, OOM etc.) from each of the threads: Fetcher, EventFetcher, ShuffleScheduler etc. smile I'm pretty sure I can't be blamed for: - throws ClassNotFoundException, IOException, InterruptedException { + throws Throwable { Please do not use this bad idiom.
          Jothi Padmanabhan made changes -
          Attachment mapred-318-20Aug.patch [ 12417146 ]
          Hide
          Jothi Padmanabhan added a comment -

          Some more modifications to the previous patch

          1. The shuffle status on the web UI – the number of maps being copied and the bandwidth – is now present. This will make this similar to the current trunk status
          2. Modified the condition for triggering on disk merge as numfies > (2*iosortfactor - 1), similar to the current trunk code. This ensures we do merges a little less agressively
          3. Modified to trigger an memory merge on stall. We cannot pull the trigger only on the cross of memory threshold, that has a possibility of hang when several fetchers just return if there is not sufficient memory for the current map output, but the total memory used has not crossed the threshold.
          Show
          Jothi Padmanabhan added a comment - Some more modifications to the previous patch The shuffle status on the web UI – the number of maps being copied and the bandwidth – is now present. This will make this similar to the current trunk status Modified the condition for triggering on disk merge as numfies > (2*iosortfactor - 1), similar to the current trunk code. This ensures we do merges a little less agressively Modified to trigger an memory merge on stall. We cannot pull the trigger only on the cross of memory threshold, that has a possibility of hang when several fetchers just return if there is not sufficient memory for the current map output, but the total memory used has not crossed the threshold.
          Jothi Padmanabhan made changes -
          Attachment mapred-318-14Aug.patch [ 12416553 ]
          Attachment mapred-318-common.patch [ 12416554 ]
          Hide
          Jothi Padmanabhan added a comment -

          Patches for review

          Show
          Jothi Padmanabhan added a comment - Patches for review
          Owen O'Malley made changes -
          Project Hadoop Common [ 12310240 ] Hadoop Map/Reduce [ 12310941 ]
          Key HADOOP-5223 MAPREDUCE-318
          Component/s mapred [ 12310690 ]
          Fix Version/s 0.21.0 [ 12313563 ]
          Hide
          Scott Carey added a comment -

          I've had issues with shuffle on 0.19, on a cluster with new hardware capable of running 13+ maps with 11+ reduces concurrently (dual quad core w/ hyperthreading = 16 hardware threads, 24GB RAM, 4 drives), shuffle is always my bottleneck on any job where the maps aren't huge or they condense data down significantly before the reduce. During this bottleneck, disk, network, and CPU are calm. I collected quite a few trhead dumps in this state on many different jobs. Increasing parallel copies and tasktracker http threads had no effect. For the most part, the thread dumps always had the shuffle fetch threads idle, and the main thread here:

          "main" prio=10 tid=0x000000005eed3800 nid=0x62a2 waiting on condition [0x0000000040eb4000]
          java.lang.Thread.State: TIMED_WAITING (sleeping)
          at java.lang.Thread.sleep(Native Method)
          at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.getMapCompletionEvents(ReduceTask.java:2244)
          at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.fetchOutputs(ReduceTask.java:1716)
          at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:368)
          at org.apache.hadoop.mapred.Child.main(Child.java:158)

          There is a one line fix. This JIRA is refacoring the code that this fix would apply to however, so i'll post my info here to make sure the improvements are contained in it.
          The fix improves times by 40% on a composite set of jobs (a cascading flow consisting of 25+ map/reduce jobs, with up to 7 concurrent ones).

          First, the fix I made is the below:

          Comment out or delete the line:

           break; //we have a map from this host 

          in ReduceOutput.java in ReduceCopier.fetchOutputs()
          – line 1804 in 0.19.2-dev, 1911 on 0.20.1-dev and line 1954 currently on trunk.

          Reduces that took 1 to 2 minutes while copying little data in the shuffle phase on average on my set of jobs now take 1 to 3 seconds on average in the shuffle.

          Here is the problem. The shuffle is currently limiting itself to only copy one shard from a single host per pass, and then sleeping. As long as the number of map shards is much more than the number of hosts, this requires quite a few sleep delays. For servers that can handle many map and reduce tasks each, this gets out of hand quickly, especially on small or medium sized clusters where the ideal concurrent shuffle copies per reduce is on the order of, or larger than, the number of hosts.

          A more sophisticated fix such as this JIRA will do more, but the low hanging fruit performance fix is to get every shard that is reported from the last ping before sleeping again and checking for more. This not only improves the shuffle speed, but reduces the total number of pings to find out what shards are available which reduces load elsewhere. It makes little sense to do what happens now on a small cluster:
          Discover say, 100 shards are needed to be fetched, grab 8 of them, then sleep, ask the again what are available, grab only 8, sleep ...
          At the very least, if there are 100 map outputs available to a reducer, it should keep draining from this list before sleeping and asking for an updated set.

          Some may object to opening more than one concurrent connection to a host on the basis that it could overload a tasktracker – but this would seem like a false assumption to me. First, tasktrackers throttle this with the configuration parameter for number http threads. Second, reduces throttle this with the number of concurrent shuffle fetch threads. There is no difference between a reduce opening 10 concurrent shuffle threads to 10 hosts and 10 to one host, when all reduces are concurrently doing this and randomly choosing hosts the average number of concurrent connections on one TT will remain the same.
          If it is a serious concern for other reasons (the 'penalty box'? or other error handling?) then the shuffle queue could be filled in a better order than one host at a time, or at least not sleep and re-fetch the list without first draining it. A more significant refactor may do better than the one liner – but I suspect this alone is most of the performance gain.

          Here is a sample log before and after the change on 0.19 with a small dev cluster with newer hardware – a particularly bad case for this:
          3 TT's, each configured for 13 concurrent maps, 11 concurrent reduces, 10 concurrent shuffle copies, 40 TT http threads:

          Before:

          2009-06-09 22:13:53,657 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Need another 51 map output(s) where 0 is already in progress
          2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082206_0006_m_000050_0'
          2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 51 new map-outputs
          2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 1 obsolete map-outputs from tasktracker 
          2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts)
          2009-06-09 22:13:53,689 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 54394 bytes (54398 raw bytes) into RAM from attempt_200906082206_0006_m_000014_0
          2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 70736 bytes (70740 raw bytes) into RAM from attempt_200906082206_0006_m_000003_0
          2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 73540 bytes (73544 raw bytes) into RAM from attempt_200906082206_0006_m_000001_0
          2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 54394 bytes from map-output for attempt_200906082206_0006_m_000014_0
          2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 70736 bytes from map-output for attempt_200906082206_0006_m_000003_0
          2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 73540 bytes from map-output for attempt_200906082206_0006_m_000001_0
          2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000014_0 -> (21, 205) from 10.3.0.142
          2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000003_0 -> (21, 240) from 10.3.0.143
          2009-06-09 22:13:53,693 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000001_0 -> (21, 204) from 10.3.0.141
          2009-06-09 22:13:55,662 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts)
            
           -- SNIP --
          
          2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Read 79913 bytes from map-output for attempt_200906082206_0006_m_000042_0
          2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000042_0 -> (21, 237) from 10.3.0.141
          2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
          2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
          2009-06-09 22:14:50,752 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 51 files left.
          2009-06-09 22:14:50,813 INFO org.apache.hadoop.mapred.Merger: Merging 51 sorted segments
          2009-06-09 22:14:50,817 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 51 segments left of total size: 3325252 bytes
          

          After – (slightly different job):

          2009-06-08 23:51:07,057 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Thread waiting: Thread for merging on-disk files
          2009-06-08 23:51:07,058 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Need another 68 map output(s) where 0 is already in progress
          2009-06-08 23:51:07,069 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000060_1'
          2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000014_0'
          2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 68 new map-outputs
          2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 2 obsolete map-outputs from tasktracker 
          2009-06-08 23:51:07,071 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Scheduled 68 outputs (0 slow hosts and0 dup hosts)
          2009-06-08 23:51:07,106 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 674904 bytes (674908 raw bytes) into RAM from attempt_200906082336_0014_m_000005_0
          2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Read 674904 bytes from map-output for attempt_200906082336_0014_m_000005_0
          2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000005_0 -> (61, 26) from 10.3.0.143
          
          -- SNIP --
          
          2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Read 1439739 bytes from map-output for attempt_200906082336_0014_m_000012_1
          2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000012_1 -> (50, 25) from 10.3.0.141
          2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
          2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
          2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 68 files left.
          2009-06-08 23:51:09,122 INFO org.apache.hadoop.mapred.Merger: Merging 68 sorted segments
          2009-06-08 23:51:09,126 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 52 segments left of total size: 44450255 bytes
          
          Show
          Scott Carey added a comment - I've had issues with shuffle on 0.19, on a cluster with new hardware capable of running 13+ maps with 11+ reduces concurrently (dual quad core w/ hyperthreading = 16 hardware threads, 24GB RAM, 4 drives), shuffle is always my bottleneck on any job where the maps aren't huge or they condense data down significantly before the reduce. During this bottleneck, disk, network, and CPU are calm. I collected quite a few trhead dumps in this state on many different jobs. Increasing parallel copies and tasktracker http threads had no effect. For the most part, the thread dumps always had the shuffle fetch threads idle, and the main thread here: "main" prio=10 tid=0x000000005eed3800 nid=0x62a2 waiting on condition [0x0000000040eb4000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.getMapCompletionEvents(ReduceTask.java:2244) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.fetchOutputs(ReduceTask.java:1716) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:368) at org.apache.hadoop.mapred.Child.main(Child.java:158) There is a one line fix. This JIRA is refacoring the code that this fix would apply to however, so i'll post my info here to make sure the improvements are contained in it. The fix improves times by 40% on a composite set of jobs (a cascading flow consisting of 25+ map/reduce jobs, with up to 7 concurrent ones). First, the fix I made is the below: Comment out or delete the line: break ; //we have a map from this host in ReduceOutput.java in ReduceCopier.fetchOutputs() – line 1804 in 0.19.2-dev, 1911 on 0.20.1-dev and line 1954 currently on trunk. Reduces that took 1 to 2 minutes while copying little data in the shuffle phase on average on my set of jobs now take 1 to 3 seconds on average in the shuffle. Here is the problem. The shuffle is currently limiting itself to only copy one shard from a single host per pass, and then sleeping. As long as the number of map shards is much more than the number of hosts, this requires quite a few sleep delays. For servers that can handle many map and reduce tasks each, this gets out of hand quickly, especially on small or medium sized clusters where the ideal concurrent shuffle copies per reduce is on the order of, or larger than, the number of hosts. A more sophisticated fix such as this JIRA will do more, but the low hanging fruit performance fix is to get every shard that is reported from the last ping before sleeping again and checking for more. This not only improves the shuffle speed, but reduces the total number of pings to find out what shards are available which reduces load elsewhere. It makes little sense to do what happens now on a small cluster: Discover say, 100 shards are needed to be fetched, grab 8 of them, then sleep, ask the again what are available, grab only 8, sleep ... At the very least, if there are 100 map outputs available to a reducer, it should keep draining from this list before sleeping and asking for an updated set. Some may object to opening more than one concurrent connection to a host on the basis that it could overload a tasktracker – but this would seem like a false assumption to me. First, tasktrackers throttle this with the configuration parameter for number http threads. Second, reduces throttle this with the number of concurrent shuffle fetch threads. There is no difference between a reduce opening 10 concurrent shuffle threads to 10 hosts and 10 to one host, when all reduces are concurrently doing this and randomly choosing hosts the average number of concurrent connections on one TT will remain the same. If it is a serious concern for other reasons (the 'penalty box'? or other error handling?) then the shuffle queue could be filled in a better order than one host at a time, or at least not sleep and re-fetch the list without first draining it. A more significant refactor may do better than the one liner – but I suspect this alone is most of the performance gain. Here is a sample log before and after the change on 0.19 with a small dev cluster with newer hardware – a particularly bad case for this: 3 TT's, each configured for 13 concurrent maps, 11 concurrent reduces, 10 concurrent shuffle copies, 40 TT http threads: Before: 2009-06-09 22:13:53,657 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Need another 51 map output(s) where 0 is already in progress 2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082206_0006_m_000050_0' 2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 51 new map-outputs 2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 1 obsolete map-outputs from tasktracker 2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts) 2009-06-09 22:13:53,689 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 54394 bytes (54398 raw bytes) into RAM from attempt_200906082206_0006_m_000014_0 2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 70736 bytes (70740 raw bytes) into RAM from attempt_200906082206_0006_m_000003_0 2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 73540 bytes (73544 raw bytes) into RAM from attempt_200906082206_0006_m_000001_0 2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 54394 bytes from map-output for attempt_200906082206_0006_m_000014_0 2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 70736 bytes from map-output for attempt_200906082206_0006_m_000003_0 2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 73540 bytes from map-output for attempt_200906082206_0006_m_000001_0 2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000014_0 -> (21, 205) from 10.3.0.142 2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000003_0 -> (21, 240) from 10.3.0.143 2009-06-09 22:13:53,693 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000001_0 -> (21, 204) from 10.3.0.141 2009-06-09 22:13:55,662 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts) -- SNIP -- 2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Read 79913 bytes from map-output for attempt_200906082206_0006_m_000042_0 2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000042_0 -> (21, 237) from 10.3.0.141 2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager 2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left. 2009-06-09 22:14:50,752 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 51 files left. 2009-06-09 22:14:50,813 INFO org.apache.hadoop.mapred.Merger: Merging 51 sorted segments 2009-06-09 22:14:50,817 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 51 segments left of total size: 3325252 bytes After – (slightly different job): 2009-06-08 23:51:07,057 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Thread waiting: Thread for merging on-disk files 2009-06-08 23:51:07,058 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Need another 68 map output(s) where 0 is already in progress 2009-06-08 23:51:07,069 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000060_1' 2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000014_0' 2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 68 new map-outputs 2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 2 obsolete map-outputs from tasktracker 2009-06-08 23:51:07,071 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Scheduled 68 outputs (0 slow hosts and0 dup hosts) 2009-06-08 23:51:07,106 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 674904 bytes (674908 raw bytes) into RAM from attempt_200906082336_0014_m_000005_0 2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Read 674904 bytes from map-output for attempt_200906082336_0014_m_000005_0 2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000005_0 -> (61, 26) from 10.3.0.143 -- SNIP -- 2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Read 1439739 bytes from map-output for attempt_200906082336_0014_m_000012_1 2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000012_1 -> (50, 25) from 10.3.0.141 2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager 2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left. 2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 68 files left. 2009-06-08 23:51:09,122 INFO org.apache.hadoop.mapred.Merger: Merging 68 sorted segments 2009-06-08 23:51:09,126 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 52 segments left of total size: 44450255 bytes
          Hide
          Arun C Murthy added a comment -

          Forgot to add that we felt it's better to move the responsibility of tracking the shuffle into a new ShuffleScheduler; the Fetcher directly informs the ShuffleScheduler about the success/failure of each map-output without going through another queue of CopyResult as-is today.

          Show
          Arun C Murthy added a comment - Forgot to add that we felt it's better to move the responsibility of tracking the shuffle into a new ShuffleScheduler; the Fetcher directly informs the ShuffleScheduler about the success/failure of each map-output without going through another queue of CopyResult as-is today.
          Arun C Murthy made changes -
          Attachment HADOOP-5233_part0.patch [ 12400093 ]
          Hide
          Arun C Murthy added a comment -

          Another puzzle piece, includes a completed Fetcher alongwith some minor changes to the api and a newly added 'ShuffleContext' to pass around...

          Show
          Arun C Murthy added a comment - Another puzzle piece, includes a completed Fetcher alongwith some minor changes to the api and a newly added 'ShuffleContext' to pass around...
          Arun C Murthy made changes -
          Field Original Value New Value
          Attachment HADOOP-5233_api.patch [ 12400049 ]
          Hide
          Arun C Murthy added a comment -

          Here is a the rough draft of the api we've been working on...

          Show
          Arun C Murthy added a comment - Here is a the rough draft of the api we've been working on...
          Hide
          Devaraj Das added a comment -

          This looks nice. One other thing that probably should be added is the failure handling (failed-fetch notifications, etc. on getting a CopyResult with an error).

          Show
          Devaraj Das added a comment - This looks nice. One other thing that probably should be added is the failure handling (failed-fetch notifications, etc. on getting a CopyResult with an error).
          Hide
          Owen O'Malley added a comment -

          We also plan to incorporate HADOOP-1338 into this patch, where the fetchers will request a list of map outputs and the map outputs look like:

          <vint id length> <task id>
          <vint compressed size>
          <vint raw size>
          <map data>
          ... repeated for each map.

          Show
          Owen O'Malley added a comment - We also plan to incorporate HADOOP-1338 into this patch, where the fetchers will request a list of map outputs and the map outputs look like: <vint id length> <task id> <vint compressed size> <vint raw size> <map data> ... repeated for each map.
          Hide
          Owen O'Malley added a comment -

          Roughly, I think the flow should look like:

          EventFetcher -> HostPlanner -> FetcherPool -> OutputMerger

          There is also a main shuffle object that tracks the progress of the shuffle. Each of these should be a separate class. The EventFetcher gets the map completion events from the TaskTracker. The HostPlanner will keep track of available map outputs, penalty box, and hands out hosts that are ready to the fetchers. The FetcherPool is pool of threads that are doing the actual copy of data. The OutputMerger manages the in memory and on disk data and has a thread to do merges.

          We'll post a patch with the api soon.

          Show
          Owen O'Malley added a comment - Roughly, I think the flow should look like: EventFetcher -> HostPlanner -> FetcherPool -> OutputMerger There is also a main shuffle object that tracks the progress of the shuffle. Each of these should be a separate class. The EventFetcher gets the map completion events from the TaskTracker. The HostPlanner will keep track of available map outputs, penalty box, and hands out hosts that are ready to the fetchers. The FetcherPool is pool of threads that are doing the actual copy of data. The OutputMerger manages the in memory and on disk data and has a thread to do merges. We'll post a patch with the api soon.
          Owen O'Malley created issue -

            People

            • Assignee:
              Owen O'Malley
              Reporter:
              Owen O'Malley
            • Votes:
              0 Vote for this issue
              Watchers:
              35 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development