Hadoop Common
  1. Hadoop Common
  2. HADOOP-910

Reduces can do merges for the on-disk map output files in parallel with their copying

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.17.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Reducers now perform merges of shuffle data (both in-memory and on disk) while fetching map outputs. Earlier, during shuffle they used to merge only the in-memory outputs.

      Description

      Proposal to extend the parallel in-memory-merge/copying, that is being done as part of HADOOP-830, to the on-disk files.

      Today, the Reduces dump the map output files to disk and the final merge happens only after all the map outputs have been collected. It might make sense to parallelize this part. That is, whenever a Reduce has collected io.sort.factor number of segments on disk, it initiates a merge of those and creates one big segment. If the rate of copying is faster than the merge, we can probably have multiple threads doing parallel merges of independent sets of io.sort.factor number of segments. If the rate of copying is not as fast as merge, we stand to gain a lot - at the end of copying of all the map outputs, we will be left with a small number of segments for the final merge (which hopefully will feed the reduce directly (via the RawKeyValueIterator) without having to hit the disk for writing additional output segments).
      If the disk bandwidth is higher than the network bandwidth, we have a good story, I guess, to do such a thing.

      1. HADOOP-910-review.patch
        13 kB
        Amar Kamat
      2. HADOOP-910.patch
        14 kB
        Amar Kamat
      3. HADOOP-910.patch
        14 kB
        Amar Kamat
      4. HADOOP-910.patch
        13 kB
        Amar Kamat

        Activity

        Owen O'Malley made changes -
        Component/s mapred [ 12310690 ]
        Nigel Daley made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Devaraj Das made changes -
        Hadoop Flags [Reviewed]
        Release Note Reducers now perform merges of shuffle data (both in-memory and on disk) while fetching map outputs. Earlier, during shuffle they used to merge only the in-memory outputs.
        Hide
        Amar Kamat added a comment -

        This patch would help in the following settings
        1) low fs.inmemory.size.mb : In case of low ramfs, more files will be present on the disk (either due to ramfs miss and also due to faster merge).
        So while the shuffle phase is happening the reducer can simultaneously start the merging of the disk files. Also in cases with sufficient ramfs but huge
        number of maps there will be lots of files on disk (more the merges in ramfs). A premature merge under such conditions will definitely help.
        2) low io.sort.factor : In such cases the on-disk merge will kick in faster. One on-disk merge requires 2*io.sort.factor - 1 files to be on disk.

        Show
        Amar Kamat added a comment - This patch would help in the following settings 1) low fs.inmemory.size.mb : In case of low ramfs, more files will be present on the disk (either due to ramfs miss and also due to faster merge). So while the shuffle phase is happening the reducer can simultaneously start the merging of the disk files. Also in cases with sufficient ramfs but huge number of maps there will be lots of files on disk (more the merges in ramfs). A premature merge under such conditions will definitely help. 2) low io.sort.factor : In such cases the on-disk merge will kick in faster. One on-disk merge requires 2* io.sort.factor - 1 files to be on disk.
        Hide
        Amar Kamat added a comment -

        You will gain from on-disk merging if

        1)  your map outputs are larger than the in-memory filesystem 
        2) if the reducers are idle (waiting for the maps) 
              2.1)  too many lost trackers i.e re-execution
              2.2)  network is loaded 
              2.3)  there are waves of maps. 
        

        In case of (1) and (2.3) there is a definite gain but (2.1) and (2.2) are opportunistic cases.

        Show
        Amar Kamat added a comment - You will gain from on-disk merging if 1) your map outputs are larger than the in-memory filesystem 2) if the reducers are idle (waiting for the maps) 2.1) too many lost trackers i.e re-execution 2.2) network is loaded 2.3) there are waves of maps. In case of (1) and (2.3) there is a definite gain but (2.1) and (2.2) are opportunistic cases.
        Hide
        Mahadev konar added a comment -

        i read through the description and comments. Amar can you explain with what parameter changes would this patch show performace enhancements? meaning for what "practical" parameters would this patch help the performance? I need that to do some performance measuremetnts. Thanks.

        Show
        Mahadev konar added a comment - i read through the description and comments. Amar can you explain with what parameter changes would this patch show performace enhancements? meaning for what "practical" parameters would this patch help the performance? I need that to do some performance measuremetnts. Thanks.
        Hide
        Hudson added a comment -
        Show
        Hudson added a comment - Integrated in Hadoop-trunk #418 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/418/ )
        Devaraj Das made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Fix Version/s 0.17.0 [ 12312913 ]
        Hide
        Devaraj Das added a comment -

        I just committed this. Thanks, Amar!

        Show
        Devaraj Das added a comment - I just committed this. Thanks, Amar!
        Hide
        Amar Kamat added a comment -

        Core test failed on TestDU but passed on my system. Also not sure how TestDU is related to my patch.

        Show
        Amar Kamat added a comment - Core test failed on TestDU but passed on my system. Also not sure how TestDU is related to my patch.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12376894/HADOOP-910.patch
        against trunk revision 619744.

        @author +1. The patch does not contain any @author tags.

        tests included -1. The patch doesn't appear to include any new or modified tests.
        Please justify why no tests are needed for this patch.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new javac compiler warnings.

        release audit +1. The applied patch does not generate any new release audit warnings.

        findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

        core tests -1. The patch failed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/testReport/
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12376894/HADOOP-910.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/console This message is automatically generated.
        Amar Kamat made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Amar Kamat made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Amar Kamat made changes -
        Attachment HADOOP-910.patch [ 12376894 ]
        Hide
        Amar Kamat added a comment -

        Attaching a patch that respects HADOOP-1986.

        Show
        Amar Kamat added a comment - Attaching a patch that respects HADOOP-1986 .
        Hide
        Devaraj Das added a comment -

        Sorry, this patch conflicts with the just committed patch HADOOP-1986 in the way the sorter for the localfs merger is created. Pls submit a new patch.

        Show
        Devaraj Das added a comment - Sorry, this patch conflicts with the just committed patch HADOOP-1986 in the way the sorter for the localfs merger is created. Pls submit a new patch.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12376230/HADOOP-910.patch
        against trunk revision 619744.

        @author +1. The patch does not contain any @author tags.

        tests included -1. The patch doesn't appear to include any new or modified tests.
        Please justify why no tests are needed for this patch.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new javac compiler warnings.

        release audit +1. The applied patch does not generate any new release audit warnings.

        findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

        core tests +1. The patch passed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/testReport/
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12376230/HADOOP-910.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/console This message is automatically generated.
        Hide
        Amar Kamat added a comment -

        I ran the sort benchmark with

        • 500 nodes
        • io.sort.factor=10
        • java heap size: 512mb

        and the results are as follows

        source run time
        trunk 1hr 33m
        patched 1hr 31m
        Show
        Amar Kamat added a comment - I ran the sort benchmark with 500 nodes io.sort.factor=10 java heap size: 512mb and the results are as follows source run time trunk 1hr 33m patched 1hr 31m
        Amar Kamat made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Amar Kamat added a comment -

        Considering Devaraj's suggestions. Submitting a patch.

        Show
        Amar Kamat added a comment - Considering Devaraj's suggestions. Submitting a patch.
        Amar Kamat made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Amar Kamat made changes -
        Attachment HADOOP-910.patch [ 12376230 ]
        Hide
        Devaraj Das added a comment -

        Code looks good. Some comments:
        1) The path returned by the localfs merge need not be converted to a fully qualified path since these files are guaranteed to be in the localfs
        2) The localfs merge is passed an array of paths to merge. The paths are initially stored in a List and an array is then obtained from that List. The List can be eliminated.
        3) The check for where a map output file finally went to should be based on the filesystem of the path returned.

        Show
        Devaraj Das added a comment - Code looks good. Some comments: 1) The path returned by the localfs merge need not be converted to a fully qualified path since these files are guaranteed to be in the localfs 2) The localfs merge is passed an array of paths to merge. The paths are initially stored in a List and an array is then obtained from that List. The List can be eliminated. 3) The check for where a map output file finally went to should be based on the filesystem of the path returned.
        Hide
        Amar Kamat added a comment -

        I ran the sort benchmark with

        • 200 nodes
        • io.sort.factor=10
        • java heap size: 512mb
          and the results are as follows
          source run-1 run-2
          trunk-sort 1hr 7 m 1 hr 2 m
          patched-sort 1 hr 6 m 1 hr 2 m

          For heap size of 200mb I think we have HADOOP-2751

        Show
        Amar Kamat added a comment - I ran the sort benchmark with 200 nodes io.sort.factor=10 java heap size: 512mb and the results are as follows source run-1 run-2 trunk-sort 1hr 7 m 1 hr 2 m patched-sort 1 hr 6 m 1 hr 2 m For heap size of 200mb I think we have HADOOP-2751
        Hide
        Doug Cutting added a comment -

        > java heap size: 1024mb

        Is a 1024MB heap required for the sort? Or have you also increased other buffer sizes?

        With default settings, we should run in the default heap size, which is currently 200MB for task processes. (The default heap size for daemons is currently 1000MB, but this patch only affects task processes.)

        Show
        Doug Cutting added a comment - > java heap size: 1024mb Is a 1024MB heap required for the sort? Or have you also increased other buffer sizes? With default settings, we should run in the default heap size, which is currently 200MB for task processes. (The default heap size for daemons is currently 1000MB, but this patch only affects task processes.)
        Hide
        Mukund Madhugiri added a comment -

        I ran the sort benchmark with

        • 100 nodes
        • io.sort.factor=10
        • java heap size: 1024mb

        Here are the results:

        • Sort on trunk: 29.3 min
        • Sort on trunk + patch: 29.05 min
        Show
        Mukund Madhugiri added a comment - I ran the sort benchmark with 100 nodes io.sort.factor=10 java heap size: 1024mb Here are the results: Sort on trunk: 29.3 min Sort on trunk + patch: 29.05 min
        Hide
        Amar Kamat added a comment -

        findbugs -1. The patch appears to introduce 1 new Findbugs warnings

        HADOOP-1152 introduced the code

        bytes = -1;
        

        in ReduceTask.java (line 821) which the findbugs reported here. Not sure it has something to do with my patch.

        tests included -1. The patch doesn't appear to include any new or modified tests.
        Please justify why no tests are needed for this patch.

        All the tests that validate sort (like TestCollect) are test cases for HADOOP-910 hence no special test case is required.

        Show
        Amar Kamat added a comment - findbugs -1. The patch appears to introduce 1 new Findbugs warnings HADOOP-1152 introduced the code bytes = -1; in ReduceTask.java (line 821) which the findbugs reported here. Not sure it has something to do with my patch. tests included -1. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. All the tests that validate sort (like TestCollect) are test cases for HADOOP-910 hence no special test case is required.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12375831/HADOOP-910.patch
        against trunk revision 619744.

        @author +1. The patch does not contain any @author tags.

        tests included -1. The patch doesn't appear to include any new or modified tests.
        Please justify why no tests are needed for this patch.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new javac compiler warnings.

        release audit +1. The applied patch does not generate any new release audit warnings.

        findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

        core tests +1. The patch passed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/testReport/
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12375831/HADOOP-910.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/console This message is automatically generated.
        Hide
        Amar Kamat added a comment -

        The only change is that there is a check for local jobs.

        Show
        Amar Kamat added a comment - The only change is that there is a check for local jobs.
        Amar Kamat made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Amar Kamat added a comment -

        Submitting the patch.

        Show
        Amar Kamat added a comment - Submitting the patch.
        Amar Kamat made changes -
        Attachment HADOOP-910.patch [ 12375831 ]
        Hide
        Amar Kamat added a comment - - edited

        I observed that this patch performs badly if more tasks are run simultaneously on a machine. The earlier runs ran 8 tasks simultaneously. In these runs I set mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 2.

        I think the performance degradation with mapred.tasktracker.map/reduce.tasks.maximum = 4 is transient. I ran a job with same settings mentioned earlier but with mapred.tasktracker.map/reduce.tasks.maximum = 4 and the results are as follows

        # trunk + patch trunk
        1 1hr 2 min 1 hr 4 min
        2 1hr 2 min 1 hr 5 min

        Just to be sure that the patch doesnt degrade the performance I ran a job with the following config :

        Number of nodes : 200
        Java heap size : 1024mb
        io.sort.factor : 100
        

        Following are the results
        With mapred.tasktracker.map/reduce.tasks.maximum=2

        # trunk + patch trunk
        1 1hr 6 min 1 hr 7 min
        2 1hr 11 min 1 hr 8 min
        3 1hr 7 min 1 hr 9 min

        With mapred.tasktracker.map/reduce.tasks.maximum=4

        # trunk + patch trunk
        1 1hr 14 min 1 hr 16 min
        2 1hr 16 min 1 hr 16 min
        3 1hr 18 min 1 hr 17 min

        This is expected because on-disk merge rarely occurs with io.sort.factor = 100 i.e the performance is not degraded with no (or very few) on-disk merges .


        Hardware configuration :
        Processor : 4x HT 1.8GHz Intel Xeon
        Ram : 8GB
        Disk : 4 disks each of 250GB

        Show
        Amar Kamat added a comment - - edited I observed that this patch performs badly if more tasks are run simultaneously on a machine. The earlier runs ran 8 tasks simultaneously. In these runs I set mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 2. I think the performance degradation with mapred.tasktracker.map/reduce.tasks.maximum = 4 is transient. I ran a job with same settings mentioned earlier but with mapred.tasktracker.map/reduce.tasks.maximum = 4 and the results are as follows # trunk + patch trunk 1 1hr 2 min 1 hr 4 min 2 1hr 2 min 1 hr 5 min Just to be sure that the patch doesnt degrade the performance I ran a job with the following config : Number of nodes : 200 Java heap size : 1024mb io.sort.factor : 100 Following are the results With mapred.tasktracker.map/reduce.tasks.maximum=2 # trunk + patch trunk 1 1hr 6 min 1 hr 7 min 2 1hr 11 min 1 hr 8 min 3 1hr 7 min 1 hr 9 min With mapred.tasktracker.map/reduce.tasks.maximum=4 # trunk + patch trunk 1 1hr 14 min 1 hr 16 min 2 1hr 16 min 1 hr 16 min 3 1hr 18 min 1 hr 17 min This is expected because on-disk merge rarely occurs with io.sort.factor = 100 i.e the performance is not degraded with no (or very few) on-disk merges . Hardware configuration : Processor : 4x HT 1.8GHz Intel Xeon Ram : 8GB Disk : 4 disks each of 250GB
        Hide
        Amar Kamat added a comment -

        Here are the results from a fresh run

        Number of nodes : 200
        Java heap size : 1024mb
        io.sort.factor : 10
        
        # with patch trunk
        1 1hr 4min 1hr 16min
        2 1hr 4min 1hr 16min
        3 1hr 6min 1hr 16min

        I observed that this patch performs badly if more tasks are run simultaneously on a machine. The earlier runs ran 8 tasks simultaneously. In these runs I set mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 2.

        Show
        Amar Kamat added a comment - Here are the results from a fresh run Number of nodes : 200 Java heap size : 1024mb io.sort.factor : 10 # with patch trunk 1 1hr 4min 1hr 16min 2 1hr 4min 1hr 16min 3 1hr 6min 1hr 16min I observed that this patch performs badly if more tasks are run simultaneously on a machine. The earlier runs ran 8 tasks simultaneously. In these runs I set mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 2.
        Amar Kamat made changes -
        Attachment HADOOP-910-review.patch [ 12373598 ]
        Hide
        Amar Kamat added a comment -

        This patch performs a bit badly as compared to the trunk when ram-filesystem merge and local-filesystem merge co-exist. I guess that is due to the interference between the two threads, which is evident from the logs. Here is the analysis of the logs

        local fs merge time(min) num interfering ramfs merge threads
        4.24155 26
        0.0887667 6
        0.201867 8
        0.311233 8
        0.0618333 6
        3.12602 48
        4.00395 48
        0.0716333 6
        0.02535 8
        0.0760667 6
        4.6852 38
        6.95463 58
        1.07183 34
        3.35935 60
        1.46228 6

        Here are the results of running the benchmarks with fs.inmemory.size.mb=0 (i.e no ramfs) on 100 nodes

          total runtime avg-shuffle time
        patched 1h 25m 42s 26m 7s
        trunk 1h 58m 59s 30m 21s

        comments?

        Show
        Amar Kamat added a comment - This patch performs a bit badly as compared to the trunk when ram-filesystem merge and local-filesystem merge co-exist. I guess that is due to the interference between the two threads, which is evident from the logs. Here is the analysis of the logs local fs merge time(min) num interfering ramfs merge threads 4.24155 26 0.0887667 6 0.201867 8 0.311233 8 0.0618333 6 3.12602 48 4.00395 48 0.0716333 6 0.02535 8 0.0760667 6 4.6852 38 6.95463 58 1.07183 34 3.35935 60 1.46228 6 Here are the results of running the benchmarks with fs.inmemory.size.mb=0 (i.e no ramfs) on 100 nodes   total runtime avg-shuffle time patched 1h 25m 42s 26m 7s trunk 1h 58m 59s 30m 21s comments?
        Devaraj Das made changes -
        Assignee Gautam Kowshik [ gautamk ] Amar Kamat [ amar_kamat ]
        Hide
        Doug Cutting added a comment -

        Runping: what version of Hadoop did you see this on?

        Show
        Doug Cutting added a comment - Runping: what version of Hadoop did you see this on?
        Hide
        Runping Qi added a comment -

        The observation I made was this: The map phase took 6+hours (I cannot recall the exact number).
        During the map phase, each reducer copied the 8400+ mapoutput files to its local directory.
        This overlaped well with the map phase. However, the merging these 8400+ files into 100 runs
        that can feed to the reduc e phase did not start until all the files were copied
        (which was after all the mapper were done). That merge phase could have started much earlier, working
        comcurrently with the copying phase, thus could be completed much earlier (in the specific case, the merge phase took about 8 hours!).

        Show
        Runping Qi added a comment - The observation I made was this: The map phase took 6+hours (I cannot recall the exact number). During the map phase, each reducer copied the 8400+ mapoutput files to its local directory. This overlaped well with the map phase. However, the merging these 8400+ files into 100 runs that can feed to the reduc e phase did not start until all the files were copied (which was after all the mapper were done). That merge phase could have started much earlier, working comcurrently with the copying phase, thus could be completed much earlier (in the specific case, the merge phase took about 8 hours!).
        Hide
        Sameer Paranjpye added a comment -

        What was your specific observation Runping? We're already doing a lot of in-memory merges in parallel with the shuffle and from all the runs we've seen it looks like the shuffle/merge tracks the maps pretty closely. Can we get some real data here because this feels like premature optimization.

        Show
        Sameer Paranjpye added a comment - What was your specific observation Runping? We're already doing a lot of in-memory merges in parallel with the shuffle and from all the runs we've seen it looks like the shuffle/merge tracks the maps pretty closely. Can we get some real data here because this feels like premature optimization.
        Gautam Kowshik made changes -
        Field Original Value New Value
        Assignee Gautam Kowshik [ gautamk ]
        Hide
        Runping Qi added a comment -

        I just tried a large job with 8400+ mappers.
        It is clear that overlapping copying and merge will pay off a lot.
        Paralizing merging will pay off too. T
        he main merge thread should just keep tracks the mergeable files and
        start actual merge thread whenever a merge is warranted, up to a predefined limit of merge threads.
        The optimization discussed in earlier comments still applicable, i.e. try to merge small files and avoid merge large files as much as possible.

        Show
        Runping Qi added a comment - I just tried a large job with 8400+ mappers. It is clear that overlapping copying and merge will pay off a lot. Paralizing merging will pay off too. T he main merge thread should just keep tracks the mergeable files and start actual merge thread whenever a merge is warranted, up to a predefined limit of merge threads. The optimization discussed in earlier comments still applicable, i.e. try to merge small files and avoid merge large files as much as possible.
        Hide
        Runping Qi added a comment -

        Even when single thread can not use all disk capacity, multi threading helps only if the network throughput is higher than the single thread can handle.

        Show
        Runping Qi added a comment - Even when single thread can not use all disk capacity, multi threading helps only if the network throughput is higher than the single thread can handle.
        Hide
        Raghu Angadi added a comment -

        > I suspect multiple merging threads may not buy you too much since disk i/o will
        > be the bottleneck of a merging thread, unless a single merging cannot fully
        > utilize all the available disk resources.

        Single thread can not usually use all disk capacity.. even more so on multiple disk environments, unless the thread does heavy async i/o.

        Show
        Raghu Angadi added a comment - > I suspect multiple merging threads may not buy you too much since disk i/o will > be the bottleneck of a merging thread, unless a single merging cannot fully > utilize all the available disk resources. Single thread can not usually use all disk capacity.. even more so on multiple disk environments, unless the thread does heavy async i/o.
        Hide
        Runping Qi added a comment -

        This proposal will definitely improve the sortin phase on the reducer side.
        I have one suggestion. Instead of starting merge greadily as soon as a Reduce has
        collected io.sort.factor number of segments on disk, you should wait
        until the Reducer has collected close to 2 * io.sort.factor - 1 number of segments.
        This way, you can always choose the io.sort.factor number of smallest segments
        to merge and aviod unnecessary merging large segments.
        This will also result in a more balanced final segments.
        I suspect multiple merging threads may not buy you too much since disk i/o will
        be the bottleneck of a merging thread, unless a single merging cannot fully
        utilize all the available disk resources.

        Show
        Runping Qi added a comment - This proposal will definitely improve the sortin phase on the reducer side. I have one suggestion. Instead of starting merge greadily as soon as a Reduce has collected io.sort.factor number of segments on disk, you should wait until the Reducer has collected close to 2 * io.sort.factor - 1 number of segments. This way, you can always choose the io.sort.factor number of smallest segments to merge and aviod unnecessary merging large segments. This will also result in a more balanced final segments. I suspect multiple merging threads may not buy you too much since disk i/o will be the bottleneck of a merging thread, unless a single merging cannot fully utilize all the available disk resources.
        Devaraj Das created issue -

          People

          • Assignee:
            Amar Kamat
            Reporter:
            Devaraj Das
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development