Issue Details (XML | Word | Printable)

Key: HADOOP-910
Type: Improvement Improvement
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Amar Kamat
Reporter: Devaraj Das
Votes: 0
Watchers: 0
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Reduces can do merges for the on-disk map output files in parallel with their copying

Created: 19/Jan/07 06:44 AM   Updated: 08/Jul/09 04:52 PM
Component/s: None
Affects Version/s: None
Fix Version/s: 0.17.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works HADOOP-910-review.patch 2008-01-19 10:05 AM Amar Kamat 13 kB
Text File Licensed for inclusion in ASF works HADOOP-910.patch 2008-03-01 07:09 AM Amar Kamat 13 kB
Text File Licensed for inclusion in ASF works HADOOP-910.patch 2008-02-22 02:32 PM Amar Kamat 14 kB
Text File Licensed for inclusion in ASF works HADOOP-910.patch 2008-02-18 12:52 PM Amar Kamat 14 kB

Hadoop Flags: Reviewed
Release Note: Reducers now perform merges of shuffle data (both in-memory and on disk) while fetching map outputs. Earlier, during shuffle they used to merge only the in-memory outputs.
Resolution Date: 02/Mar/08 06:10 PM


 Description  « Hide
Proposal to extend the parallel in-memory-merge/copying, that is being done as part of HADOOP-830, to the on-disk files.

Today, the Reduces dump the map output files to disk and the final merge happens only after all the map outputs have been collected. It might make sense to parallelize this part. That is, whenever a Reduce has collected io.sort.factor number of segments on disk, it initiates a merge of those and creates one big segment. If the rate of copying is faster than the merge, we can probably have multiple threads doing parallel merges of independent sets of io.sort.factor number of segments. If the rate of copying is not as fast as merge, we stand to gain a lot - at the end of copying of all the map outputs, we will be left with a small number of segments for the final merge (which hopefully will feed the reduce directly (via the RawKeyValueIterator) without having to hit the disk for writing additional output segments).
If the disk bandwidth is higher than the network bandwidth, we have a good story, I guess, to do such a thing.



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Runping Qi added a comment - 19/Jan/07 02:29 PM

This proposal will definitely improve the sortin phase on the reducer side.
I have one suggestion. Instead of starting merge greadily as soon as a Reduce has
collected io.sort.factor number of segments on disk, you should wait
until the Reducer has collected close to 2 * io.sort.factor - 1 number of segments.
This way, you can always choose the io.sort.factor number of smallest segments
to merge and aviod unnecessary merging large segments.
This will also result in a more balanced final segments.
I suspect multiple merging threads may not buy you too much since disk i/o will
be the bottleneck of a merging thread, unless a single merging cannot fully
utilize all the available disk resources.


Raghu Angadi added a comment - 19/Jan/07 06:36 PM
> I suspect multiple merging threads may not buy you too much since disk i/o will
> be the bottleneck of a merging thread, unless a single merging cannot fully
> utilize all the available disk resources.

Single thread can not usually use all disk capacity.. even more so on multiple disk environments, unless the thread does heavy async i/o.


Runping Qi added a comment - 19/Jan/07 07:56 PM

Even when single thread can not use all disk capacity, multi threading helps only if the network throughput is higher than the single thread can handle.


Runping Qi added a comment - 09/Feb/07 06:51 PM

I just tried a large job with 8400+ mappers.
It is clear that overlapping copying and merge will pay off a lot.
Paralizing merging will pay off too. T
he main merge thread should just keep tracks the mergeable files and
start actual merge thread whenever a merge is warranted, up to a predefined limit of merge threads.
The optimization discussed in earlier comments still applicable, i.e. try to merge small files and avoid merge large files as much as possible.


Sameer Paranjpye added a comment - 13/Mar/07 07:56 PM
What was your specific observation Runping? We're already doing a lot of in-memory merges in parallel with the shuffle and from all the runs we've seen it looks like the shuffle/merge tracks the maps pretty closely. Can we get some real data here because this feels like premature optimization.

Runping Qi added a comment - 19/Mar/07 11:33 PM

The observation I made was this: The map phase took 6+hours (I cannot recall the exact number).
During the map phase, each reducer copied the 8400+ mapoutput files to its local directory.
This overlaped well with the map phase. However, the merging these 8400+ files into 100 runs
that can feed to the reduc e phase did not start until all the files were copied
(which was after all the mapper were done). That merge phase could have started much earlier, working
comcurrently with the copying phase, thus could be completed much earlier (in the specific case, the merge phase took about 8 hours!).


Doug Cutting added a comment - 19/Mar/07 11:40 PM
Runping: what version of Hadoop did you see this on?

Amar Kamat added a comment - 19/Jan/08 10:05 AM
This patch performs a bit badly as compared to the trunk when ram-filesystem merge and local-filesystem merge co-exist. I guess that is due to the interference between the two threads, which is evident from the logs. Here is the analysis of the logs
local fs merge time(min) num interfering ramfs merge threads
4.24155 26
0.0887667 6
0.201867 8
0.311233 8
0.0618333 6
3.12602 48
4.00395 48
0.0716333 6
0.02535 8
0.0760667 6
4.6852 38
6.95463 58
1.07183 34
3.35935 60
1.46228 6

Here are the results of running the benchmarks with fs.inmemory.size.mb=0 (i.e no ramfs) on 100 nodes

  total runtime avg-shuffle time
patched 1h 25m 42s 26m 7s
trunk 1h 58m 59s 30m 21s

comments?


Amar Kamat added a comment - 13/Feb/08 07:00 PM
Here are the results from a fresh run
Number of nodes : 200
Java heap size : 1024mb
io.sort.factor : 10
# with patch trunk
1 1hr 4min 1hr 16min
2 1hr 4min 1hr 16min
3 1hr 6min 1hr 16min

I observed that this patch performs badly if more tasks are run simultaneously on a machine. The earlier runs ran 8 tasks simultaneously. In these runs I set mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 2.


Amar Kamat added a comment - 15/Feb/08 06:14 AM - edited

I observed that this patch performs badly if more tasks are run simultaneously on a machine. The earlier runs ran 8 tasks simultaneously. In these runs I set mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 2.

I think the performance degradation with mapred.tasktracker.map/reduce.tasks.maximum = 4 is transient. I ran a job with same settings mentioned earlier but with mapred.tasktracker.map/reduce.tasks.maximum = 4 and the results are as follows

# trunk + patch trunk
1 1hr 2 min 1 hr 4 min
2 1hr 2 min 1 hr 5 min

Just to be sure that the patch doesnt degrade the performance I ran a job with the following config :

Number of nodes : 200
Java heap size : 1024mb
io.sort.factor : 100

Following are the results
With mapred.tasktracker.map/reduce.tasks.maximum=2

# trunk + patch trunk
1 1hr 6 min 1 hr 7 min
2 1hr 11 min 1 hr 8 min
3 1hr 7 min 1 hr 9 min

With mapred.tasktracker.map/reduce.tasks.maximum=4

# trunk + patch trunk
1 1hr 14 min 1 hr 16 min
2 1hr 16 min 1 hr 16 min
3 1hr 18 min 1 hr 17 min

This is expected because on-disk merge rarely occurs with io.sort.factor = 100 i.e the performance is not degraded with no (or very few) on-disk merges .


Hardware configuration :
Processor : 4x HT 1.8GHz Intel Xeon
Ram : 8GB
Disk : 4 disks each of 250GB


Amar Kamat added a comment - 18/Feb/08 12:54 PM
Submitting the patch.

Amar Kamat added a comment - 18/Feb/08 01:44 PM
The only change is that there is a check for local jobs.

Hadoop QA added a comment - 18/Feb/08 02:00 PM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12375831/HADOOP-910.patch
against trunk revision 619744.

@author +1. The patch does not contain any @author tags.

tests included -1. The patch doesn't appear to include any new or modified tests.
Please justify why no tests are needed for this patch.

javadoc +1. The javadoc tool did not generate any warning messages.

javac +1. The applied patch does not generate any new javac compiler warnings.

release audit +1. The applied patch does not generate any new release audit warnings.

findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

core tests +1. The patch passed core unit tests.

contrib tests +1. The patch passed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/console

This message is automatically generated.


Amar Kamat added a comment - 18/Feb/08 03:58 PM

findbugs -1. The patch appears to introduce 1 new Findbugs warnings

HADOOP-1152 introduced the code

bytes = -1;

in ReduceTask.java (line 821) which the findbugs reported here. Not sure it has something to do with my patch.

tests included -1. The patch doesn't appear to include any new or modified tests.
Please justify why no tests are needed for this patch.

All the tests that validate sort (like TestCollect) are test cases for HADOOP-910 hence no special test case is required.


Mukund Madhugiri added a comment - 20/Feb/08 01:19 AM
I ran the sort benchmark with
  • 100 nodes
  • io.sort.factor=10
  • java heap size: 1024mb

Here are the results:

  • Sort on trunk: 29.3 min
  • Sort on trunk + patch: 29.05 min

Doug Cutting added a comment - 20/Feb/08 05:50 PM
> java heap size: 1024mb

Is a 1024MB heap required for the sort? Or have you also increased other buffer sizes?

With default settings, we should run in the default heap size, which is currently 200MB for task processes. (The default heap size for daemons is currently 1000MB, but this patch only affects task processes.)


Amar Kamat added a comment - 21/Feb/08 07:50 PM
I ran the sort benchmark with
  • 200 nodes
  • io.sort.factor=10
  • java heap size: 512mb
    and the results are as follows
    source run-1 run-2
    trunk-sort 1hr 7 m 1 hr 2 m
    patched-sort 1 hr 6 m 1 hr 2 m

    For heap size of 200mb I think we have HADOOP-2751


Devaraj Das added a comment - 22/Feb/08 08:07 AM
Code looks good. Some comments:
1) The path returned by the localfs merge need not be converted to a fully qualified path since these files are guaranteed to be in the localfs
2) The localfs merge is passed an array of paths to merge. The paths are initially stored in a List and an array is then obtained from that List. The List can be eliminated.
3) The check for where a map output file finally went to should be based on the filesystem of the path returned.

Amar Kamat added a comment - 22/Feb/08 02:34 PM
Considering Devaraj's suggestions. Submitting a patch.

Amar Kamat added a comment - 22/Feb/08 05:11 PM
I ran the sort benchmark with
  • 500 nodes
  • io.sort.factor=10
  • java heap size: 512mb

and the results are as follows

source run time
trunk 1hr 33m
patched 1hr 31m

Hadoop QA added a comment - 27/Feb/08 02:31 AM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12376230/HADOOP-910.patch
against trunk revision 619744.

@author +1. The patch does not contain any @author tags.

tests included -1. The patch doesn't appear to include any new or modified tests.
Please justify why no tests are needed for this patch.

javadoc +1. The javadoc tool did not generate any warning messages.

javac +1. The applied patch does not generate any new javac compiler warnings.

release audit +1. The applied patch does not generate any new release audit warnings.

findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

core tests +1. The patch passed core unit tests.

contrib tests +1. The patch passed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/console

This message is automatically generated.


Devaraj Das added a comment - 28/Feb/08 07:35 PM
Sorry, this patch conflicts with the just committed patch HADOOP-1986 in the way the sorter for the localfs merger is created. Pls submit a new patch.

Amar Kamat added a comment - 01/Mar/08 07:09 AM
Attaching a patch that respects HADOOP-1986.

Hadoop QA added a comment - 01/Mar/08 08:44 AM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12376894/HADOOP-910.patch
against trunk revision 619744.

@author +1. The patch does not contain any @author tags.

tests included -1. The patch doesn't appear to include any new or modified tests.
Please justify why no tests are needed for this patch.

javadoc +1. The javadoc tool did not generate any warning messages.

javac +1. The applied patch does not generate any new javac compiler warnings.

release audit +1. The applied patch does not generate any new release audit warnings.

findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

core tests -1. The patch failed core unit tests.

contrib tests +1. The patch passed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/console

This message is automatically generated.


Amar Kamat added a comment - 01/Mar/08 03:16 PM
Core test failed on TestDU but passed on my system. Also not sure how TestDU is related to my patch.

Devaraj Das added a comment - 02/Mar/08 06:10 PM
I just committed this. Thanks, Amar!

Hudson added a comment - 03/Mar/08 12:35 PM

Mahadev konar added a comment - 05/Mar/08 07:30 PM
i read through the description and comments. Amar can you explain with what parameter changes would this patch show performace enhancements? meaning for what "practical" parameters would this patch help the performance? I need that to do some performance measuremetnts. Thanks.

Amar Kamat added a comment - 06/Mar/08 05:08 AM
You will gain from on-disk merging if
1)  your map outputs are larger than the in-memory filesystem 
2) if the reducers are idle (waiting for the maps) 
      2.1)  too many lost trackers i.e re-execution
      2.2)  network is loaded 
      2.3)  there are waves of maps. 

In case of (1) and (2.3) there is a definite gain but (2.1) and (2.2) are opportunistic cases.


Amar Kamat added a comment - 06/Mar/08 01:39 PM
This patch would help in the following settings
1) low fs.inmemory.size.mb : In case of low ramfs, more files will be present on the disk (either due to ramfs miss and also due to faster merge).
So while the shuffle phase is happening the reducer can simultaneously start the merging of the disk files. Also in cases with sufficient ramfs but huge
number of maps there will be lots of files on disk (more the merges in ramfs). A premature merge under such conditions will definitely help.
2) low io.sort.factor : In such cases the on-disk merge will kick in faster. One on-disk merge requires 2*io.sort.factor - 1 files to be on disk.