|
Did you mean, a prerequisite is What I need is a class that takes in a set of pathnames and creates splits according to the locality of all blocks in the input set. So, my requirements does not affect your proposed changes to FileInputFormat.getSplits()
First version of the patch for early review comments.
The above patch will work only if BlockLocations has topology information. i.e. getTopologyPaths() returns valid data and not String[0]. Since it is not guaranteed that getTopologyPaths will always return rack information (for example, Kosmos FS), this patch should handle that case as well.
Incorporated Jothi's review comments. Also, enhanced the API to allow specifying a minimum size, maximum size and PathFilters for creating splits.
Implemented a unit test.
This patch combines multiple blocks from different files in the input set to be part of the same split. A minimum split size can be specified in which case blocks that are not from the same rack may combine in a single split. A maximum size can be specified in which case blocks from the same rack are combined upto this limit. A user can specify PathFilter(s) to identify pools of blocks that can be combined together. a few comments:
if we are combining splits by nodes - then wouldn't it make sense to also sort the nodes by racks first (and perhaps only then by number of blocks)? (so that we can combine blocks that cannot be combined within a given node with other blocks in the same rack?)
+ if (minSize == 0 || curSplitSize > minSize || !iter.hasNext()) { it seems iter.hasNext() is being used to try to detect the end of the loop - but iter,hasNext() can be false even in the middle of the loop - right? (the way i am reading it - iter is being used to 'seek' to the current rack(/node) in the nodeToBlocks array based on sorting by number of blocks in rack)
instead - we could have collected all the overflow blocks from each rack and just combined them separately into splits at the end. would be simpler to understand/code and not change the overall amount of locality much i imagine a few other comments:
In general - this is not going to be the last implementation of a multifilesplit/format - so it would be good to have the surrounding classes (recordreaders etc.) be built in a way that more implementations of a multifilesplit can be easily accomodated.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12394713/CombineMultiFile3.patch against trunk revision 720930. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 4 new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/testReport/ This message is automatically generated. I agree with Joydeep that if this inputFormat is intended for handling large number of files, then the changes should be done in the MultiFileInputFormat itself, rather than introducing a new class. However a quick glance to the patch indicates that this inputformat works for files having more than one block.
I guess we should better introduce the file[] -> blocks[] -> {hosts[], racks[]} -> splits[] logic in FileInputFormat and use this logic both in FileInputFormat and MultiFileInputFormat. I have opened an issue for a concrete MultiFileInputFormat implementation (HADOOP-4741), and attached a patch which I developed some time ago. HADOOP-4741 depends on this issue. Incorporate review comments.
1. Inherit from MultiFileInputFormat.
2. Do not combine spilled blocks with blocks from the next rack. Instead after all racks are processed, combine all spilled blocks into a single split 3.Renamed nodesToBlocks to be racksToBlocks. 4. CombineFileInputFormat is an abstract class. CombineFileRecordReader can be used as a RecordReader. 5. The sorting is required because we want the algorithm to be capable of creating large splits. If block B1 is in racks R1, R2 and R3 and block B2 is in racks R1 and R4, then we want to process rack R1 before rack R4. If we process R1 first then block B1 and B2 can be combined into a single split. (Instead if we process rack R4, then one split will contain B2 while another split will have block B1. The earlier patch as many (or as few) splits in a single rack before moving to other racks. This caused the problem that most large splits were localised on the first few racks in the target set.
In this patch, each iteration tries to create a split for rack. Once all racks are processed once, the loop starts all over again. This distributes the rack location of the splits created in a nice uniform manner. Comments:
Looking at the apis - it actually seems that MultiFileSplit can perhaps be treated as a special implementation (extension) of CombineFileSplit (with get*offset functions always returning 0). Any thoughts on this? Not that familiar with miniMR stuff - would be good if someone else can review the test code as well .. 1. Made setXXXsize methods override the parameters specified in the config file. (the earlier version used to take the more stringent constraint if it was set in both the config file and via setXXXsize methods).
2. I did not change OneFileInfo because it seems to encapsulate the information for each file. However, i cleaned up one unused field in this object. This is used only by the client at the time of creating splits. 3. Removed the variable doIterate and replaced it to loop as long as there are blocks to process 4. CombineFileSplit is now a superclass of MultiFileSplit. This allows CombineFileRecordReader to operate on MultiFileSplit as well. [exec] +1 overall.
[exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. [exec] It would be nice if Enis can review this as well because this patch modified MultiFileSplit as well.
sorry for bringing this up so late - but should we be concerned that we are going to only use rack locality?
the reason i am bringing this up is that from a usage point of view - this may complicate how we use this. ie. - ideally we would just always use the CMFIF with some target split sizes - but now there is a question in my head on whether we should use CMFIF only in more extreme cases (like too many small files or too much data and high selectivity) - because otherwise we might use node locality for not much gain (since in majority of cases - the current inputformats are just fine) thoughts? (still making my way through the code one last time). Nitpick:
CombineFileRecordReader seems to contain an unnecessary recursive call, which can be easily eliminated. public boolean next(K key, V value) throws IOException { while ((curReader == null) || !curReader.next(key, value)) { if (!initNextRecordReader()) { return false; } } return true; } I am OK with that we only use rack locality. I think most statistics have shown that intra-rack communication is nearly free, especially when we are reading big blocks of data. the file size comment is interesting. part of the reason why we are interested in this patch is because of cases where there are lots of small files. So this actually would highlight the need to preserve node locality. (also matei reported a pretty significant difference in node/rack locality on our test cluster - that's also troubling me - we can follow that up separately).
also - the code is already structured in a way that makes node locality very easy i think (i would hazard that as we walk the blocks in a rack and organize them into splits - all we need to do is order them by hostname and we would be done). thoughts? if we can preserve node locality - then there is never any reason to use the base inputformats. we can always use the combineinputformat and let it organize splits optimally. otherwise not having node locality is something we are always going to struggle with. nit: CombineFileRecordReader - rrConstructor. + static final Class [] constructorSignature = new Class [] {InputSplit.class, Configuration.class, Reporter.class, Integer.class}; can be tightened to: + static final Class [] constructorSignature = new Class [] {CombineFileSplit.class, Configuration.class, Reporter.class, Integer.class}; looks good otherwise. My 2 cents :
CombineFileRecordReader should have a javadoc explaining the basic use case. changes in MultiFileSplit seems fine. it would be better to use the o.a.h.mapred.lib package for this InputFormat. CombinedFileInputFormat makes MultiFileInputFormat obsolete, with little differences. We should deprecate MFIF and MFIS.
In it's current form, FileInputFormat does not return splits larger than block size, unless we set mapred.min.split.size. However when it does, it could benefit from node/rack locality. Incorprates all review comments.
@Zheng: I removed the recursion. Can you pl review this method once again? Thanks. Do you know if this new input format will work well with Hadoop Archives (HAR files)? I wonder because the main use for HARs is to pack lots of small files into a few HDFS files, to reduce memory resource pressure on the namenode, so it seems like a good candidate for combining multiple files into splits.
I had a quick look at the patch and had one comment (sorry to come to this late). The test should use asserts to check that the split locations are as expected rather than printing to System.out. looks pretty good to me.
One question about the use of min and max. The logic for splitting at rack locality level makes sense - if rack Total < max but > min - then there's a split. but the logic for splitting by node doesn't have this logic. it always looks for > max. I think what might make sense is to have three thresholds:
meaning that anything more than minNodeSplit causes split at node level (before combining with next node), anything more than minRackSplit causes split at rack level (before combining across racks) and we never go beyond maxSplit ever. These three are likely to be strictly ordered (a smaller split is bad - but is worth it to maximize locality and node locality is worth more than rack locality). i don't know how we would come up with these numbers though! One small simplification - i think maxSize = 0 is essentially being treated as infinity. Setting maxSize to MAXINT if it is equal to 0 will simplify the logic in some of the iterators. Incorporated Joydeep's review comments.
I implemented three thresholds as Joydeep suggested.
@Tom: the System.out statements in the unit test is only to help debugging. The real check is already there in the assert() statements. Also, I do not know what it takes to make it work for HAR files. Do you have a specific test in mind? Dhruba,
I was thinking that the asserts may need strengthening. For example, in the following code (and other similar places) we should assert that the splits have the expected paths and locations. // make sure that each split has different locations for (int i = 0; i < splits.length; ++i) { CombineFileSplit fileSplit = (CombineFileSplit) splits[i]; System.out.println("File split(Test1): " + fileSplit); } assertEquals(splits.length, 2); Regarding HARs, I don't have a particular test in mind. We probably need a special input format for HARs - that would be a separate issue. Thanks Dhruba!
Committed revision 740402 for branch 0.20. I will post an enhanced version of the unit test as requested by Tom.
1. Added lots of asserts to the unit test.
2. Made the CombineFileSplit.toString() better formatted. 3. Enhanced the TestCombineFileInputFormat.main() so that it can run as a standalone program against a real cluster. [exec]
[exec] +1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. [exec] [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings. Tom: would appreciate it a lot if u can review the new patch that enhances the unit test.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12399403/TestCombine.txt against trunk revision 740532. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/testReport/ This message is automatically generated. +1
The enhanced test looks good. (One minor comment, it could be made more concise by introducing a method to check a split that takes a split and its expected properties. But that's not absolutely necessary.) Thanks for writing it Dhruba. Integrated in Hadoop-trunk #756 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/756/
Edit release notes for publication.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HADOOP-3293.