|
> I suspect multiple merging threads may not buy you too much since disk i/o will
> be the bottleneck of a merging thread, unless a single merging cannot fully > utilize all the available disk resources. Single thread can not usually use all disk capacity.. even more so on multiple disk environments, unless the thread does heavy async i/o. Even when single thread can not use all disk capacity, multi threading helps only if the network throughput is higher than the single thread can handle. I just tried a large job with 8400+ mappers. What was your specific observation Runping? We're already doing a lot of in-memory merges in parallel with the shuffle and from all the runs we've seen it looks like the shuffle/merge tracks the maps pretty closely. Can we get some real data here because this feels like premature optimization.
The observation I made was this: The map phase took 6+hours (I cannot recall the exact number). Runping: what version of Hadoop did you see this on?
This patch performs a bit badly as compared to the trunk when ram-filesystem merge and local-filesystem merge co-exist. I guess that is due to the interference between the two threads, which is evident from the logs. Here is the analysis of the logs
Here are the results of running the benchmarks with fs.inmemory.size.mb=0 (i.e no ramfs) on 100 nodes
comments? Here are the results from a fresh run
Number of nodes : 200 Java heap size : 1024mb io.sort.factor : 10
I observed that this patch performs badly if more tasks are run simultaneously on a machine. The earlier runs ran 8 tasks simultaneously. In these runs I set mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 2.
I think the performance degradation with mapred.tasktracker.map/reduce.tasks.maximum = 4 is transient. I ran a job with same settings mentioned earlier but with mapred.tasktracker.map/reduce.tasks.maximum = 4 and the results are as follows
Just to be sure that the patch doesnt degrade the performance I ran a job with the following config : Number of nodes : 200 Java heap size : 1024mb io.sort.factor : 100 Following are the results
With mapred.tasktracker.map/reduce.tasks.maximum=4
This is expected because on-disk merge rarely occurs with io.sort.factor = 100 i.e the performance is not degraded with no (or very few) on-disk merges . Hardware configuration : The only change is that there is a check for local jobs.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12375831/HADOOP-910.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1814/testReport/ This message is automatically generated.
bytes = -1; in ReduceTask.java (line 821) which the findbugs reported here. Not sure it has something to do with my patch.
All the tests that validate sort (like TestCollect) are test cases for I ran the sort benchmark with
Here are the results:
> java heap size: 1024mb
Is a 1024MB heap required for the sort? Or have you also increased other buffer sizes? With default settings, we should run in the default heap size, which is currently 200MB for task processes. (The default heap size for daemons is currently 1000MB, but this patch only affects task processes.) I ran the sort benchmark with
Code looks good. Some comments:
1) The path returned by the localfs merge need not be converted to a fully qualified path since these files are guaranteed to be in the localfs 2) The localfs merge is passed an array of paths to merge. The paths are initially stored in a List and an array is then obtained from that List. The List can be eliminated. 3) The check for where a map output file finally went to should be based on the filesystem of the path returned. Considering Devaraj's suggestions. Submitting a patch.
I ran the sort benchmark with
and the results are as follows
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12376230/HADOOP-910.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1835/testReport/ This message is automatically generated. Sorry, this patch conflicts with the just committed patch
Attaching a patch that respects
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12376894/HADOOP-910.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1882/testReport/ This message is automatically generated. Core test failed on TestDU but passed on my system. Also not sure how TestDU is related to my patch.
I just committed this. Thanks, Amar!
Integrated in Hadoop-trunk #418 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/418/
i read through the description and comments. Amar can you explain with what parameter changes would this patch show performace enhancements? meaning for what "practical" parameters would this patch help the performance? I need that to do some performance measuremetnts. Thanks.
You will gain from on-disk merging if
1) your map outputs are larger than the in-memory filesystem
2) if the reducers are idle (waiting for the maps)
2.1) too many lost trackers i.e re-execution
2.2) network is loaded
2.3) there are waves of maps.
In case of (1) and (2.3) there is a definite gain but (2.1) and (2.2) are opportunistic cases. This patch would help in the following settings
1) low fs.inmemory.size.mb : In case of low ramfs, more files will be present on the disk (either due to ramfs miss and also due to faster merge). So while the shuffle phase is happening the reducer can simultaneously start the merging of the disk files. Also in cases with sufficient ramfs but huge number of maps there will be lots of files on disk (more the merges in ramfs). A premature merge under such conditions will definitely help. 2) low io.sort.factor : In such cases the on-disk merge will kick in faster. One on-disk merge requires 2*io.sort.factor - 1 files to be on disk. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
This proposal will definitely improve the sortin phase on the reducer side.
I have one suggestion. Instead of starting merge greadily as soon as a Reduce has
collected io.sort.factor number of segments on disk, you should wait
until the Reducer has collected close to 2 * io.sort.factor - 1 number of segments.
This way, you can always choose the io.sort.factor number of smallest segments
to merge and aviod unnecessary merging large segments.
This will also result in a more balanced final segments.
I suspect multiple merging threads may not buy you too much since disk i/o will
be the bottleneck of a merging thread, unless a single merging cannot fully
utilize all the available disk resources.