Hadoop Common
  1. Hadoop Common
  2. HADOOP-4565

MultiFileInputSplit can use data locality information to create splits

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.20.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Improved MultiFileInputFormat so that multiple blocks from the same node or same rack can be combined into a single split.

      Description

      The MultiFileInputFormat takes a set of paths and creates splits based on file sizes. Each splits contains a few files an each split are roughly equal in size. It would be efficient if we can extend this InputFormat to create splits such each all the blocks in one split and either node-local or rack-local.

      1. TestCombine.txt
        21 kB
        dhruba borthakur
      2. CombineMultiFile9.patch
        49 kB
        dhruba borthakur
      3. CombineMultiFile8.patch
        47 kB
        dhruba borthakur
      4. CombineMultiFile7.patch
        42 kB
        dhruba borthakur
      5. CombineMultiFile5.patch
        39 kB
        dhruba borthakur
      6. CombineMultiFile4.patch
        36 kB
        dhruba borthakur
      7. CombineMultiFile3.patch
        35 kB
        dhruba borthakur
      8. CombineMultiFile2.patch
        37 kB
        dhruba borthakur
      9. CombineMultiFile.patch
        19 kB
        dhruba borthakur

        Issue Links

          Activity

          Hide
          dhruba borthakur added a comment -

          This requires that a JobClient has access to the network topology. A prerequisite is HADOOP-3293.

          Show
          dhruba borthakur added a comment - This requires that a JobClient has access to the network topology. A prerequisite is HADOOP-3293 .
          Hide
          Jothi Padmanabhan added a comment -

          This requires that a JobClient has access to the network topology. A prerequisite is HADOOP-3293.

          Did you mean, a prerequisite is HADOOP-4567 ? In HADOOP-3293, I am proposing to modify FileInputFormat.getSplits to return the correct hosts during the creation of FileSplits(). Since MultFileInputFormat overrides getSplits, it might be not able to use the changes in FileInputFormat.java. No?

          Show
          Jothi Padmanabhan added a comment - This requires that a JobClient has access to the network topology. A prerequisite is HADOOP-3293 . Did you mean, a prerequisite is HADOOP-4567 ? In HADOOP-3293 , I am proposing to modify FileInputFormat.getSplits to return the correct hosts during the creation of FileSplits(). Since MultFileInputFormat overrides getSplits, it might be not able to use the changes in FileInputFormat.java. No?
          Hide
          dhruba borthakur added a comment -

          What I need is a class that takes in a set of pathnames and creates splits according to the locality of all blocks in the input set. So, my requirements does not affect your proposed changes to FileInputFormat.getSplits()

          Show
          dhruba borthakur added a comment - What I need is a class that takes in a set of pathnames and creates splits according to the locality of all blocks in the input set. So, my requirements does not affect your proposed changes to FileInputFormat.getSplits()
          Hide
          dhruba borthakur added a comment -

          First version of the patch for early review comments.

          Show
          dhruba borthakur added a comment - First version of the patch for early review comments.
          Hide
          Jothi Padmanabhan added a comment -

          The above patch will work only if BlockLocations has topology information. i.e. getTopologyPaths() returns valid data and not String[0]. Since it is not guaranteed that getTopologyPaths will always return rack information (for example, Kosmos FS), this patch should handle that case as well.

          Show
          Jothi Padmanabhan added a comment - The above patch will work only if BlockLocations has topology information. i.e. getTopologyPaths() returns valid data and not String [0] . Since it is not guaranteed that getTopologyPaths will always return rack information (for example, Kosmos FS), this patch should handle that case as well.
          Hide
          dhruba borthakur added a comment -

          Incorporated Jothi's review comments. Also, enhanced the API to allow specifying a minimum size, maximum size and PathFilters for creating splits.

          Show
          dhruba borthakur added a comment - Incorporated Jothi's review comments. Also, enhanced the API to allow specifying a minimum size, maximum size and PathFilters for creating splits.
          Hide
          dhruba borthakur added a comment -

          Implemented a unit test.

          This patch combines multiple blocks from different files in the input set to be part of the same split.

          A minimum split size can be specified in which case blocks that are not from the same rack may combine in a single split. A maximum size can be specified in which case blocks from the same rack are combined upto this limit. A user can specify PathFilter(s) to identify pools of blocks that can be combined together.

          Show
          dhruba borthakur added a comment - Implemented a unit test. This patch combines multiple blocks from different files in the input set to be part of the same split. A minimum split size can be specified in which case blocks that are not from the same rack may combine in a single split. A maximum size can be specified in which case blocks from the same rack are combined upto this limit. A user can specify PathFilter(s) to identify pools of blocks that can be combined together.
          Hide
          Joydeep Sen Sarma added a comment -

          a few comments:

          • can u explain whether the blocks are split by racks or nodes? (the data structure says nodeToBlock etc. - but all the comments refer to 'racks')

          if we are combining splits by nodes - then wouldn't it make sense to also sort the nodes by racks first (and perhaps only then by number of blocks)? (so that we can combine blocks that cannot be combined within a given node with other blocks in the same rack?)

          • in getMoreSplits() - i didn't understand:

          + if (minSize == 0 || curSplitSize > minSize || !iter.hasNext()) {

          it seems iter.hasNext() is being used to try to detect the end of the loop - but iter,hasNext() can be false even in the middle of the loop - right? (the way i am reading it - iter is being used to 'seek' to the current rack(/node) in the nodeToBlocks array based on sorting by number of blocks in rack)

          • somewhat confused by how overflow blocks (curSplitSize < minSize) are being handled. looks like with current scheme - if there is even one overflow block from current rack - then it will be combined with blocks available from the next rack. this seems to have some issues:
          • the racks list is going to have both the racks - but the blocks are probably overwhelmingly dominated by the next rack
          • the racks list is not cleared after the overflow blocks are dealt with in the first split created on the next rack. so the next of splits will all have the previous rack in the racks list unnecessarily (I presume this will lead to incorrect inference about the locality of splits)

          instead - we could have collected all the overflow blocks from each rack and just combined them separately into splits at the end. would be simpler to understand/code and not change the overall amount of locality much i imagine

          Show
          Joydeep Sen Sarma added a comment - a few comments: can u explain whether the blocks are split by racks or nodes? (the data structure says nodeToBlock etc. - but all the comments refer to 'racks') if we are combining splits by nodes - then wouldn't it make sense to also sort the nodes by racks first (and perhaps only then by number of blocks)? (so that we can combine blocks that cannot be combined within a given node with other blocks in the same rack?) in getMoreSplits() - i didn't understand: + if (minSize == 0 || curSplitSize > minSize || !iter.hasNext()) { it seems iter.hasNext() is being used to try to detect the end of the loop - but iter,hasNext() can be false even in the middle of the loop - right? (the way i am reading it - iter is being used to 'seek' to the current rack(/node) in the nodeToBlocks array based on sorting by number of blocks in rack) somewhat confused by how overflow blocks (curSplitSize < minSize) are being handled. looks like with current scheme - if there is even one overflow block from current rack - then it will be combined with blocks available from the next rack. this seems to have some issues: the racks list is going to have both the racks - but the blocks are probably overwhelmingly dominated by the next rack the racks list is not cleared after the overflow blocks are dealt with in the first split created on the next rack. so the next of splits will all have the previous rack in the racks list unnecessarily (I presume this will lead to incorrect inference about the locality of splits) instead - we could have collected all the overflow blocks from each rack and just combined them separately into splits at the end. would be simpler to understand/code and not change the overall amount of locality much i imagine
          Hide
          Joydeep Sen Sarma added a comment -

          a few other comments:

          • do we think this patch totally supersedes multifileinputformat/multifilesplit?
          • if not - should CombineFileSplit extend MultiFileSplit? (the argument being that in that case CombineFileRecordReader can work for both MultiFileSplit and CombineFileSplit).

          In general - this is not going to be the last implementation of a multifilesplit/format - so it would be good to have the surrounding classes (recordreaders etc.) be built in a way that more implementations of a multifilesplit can be easily accomodated.

          • CombineFileInputFormat does not implement getRecordReader (throws an exception) - shouldn't it just be an abstract class then?
          • one of the bigger problems with MultiFileInputFormat was the lack of concrete implementations. I think it just makes sense to provide a full implementation of combinefileinputformat for text files (and perhaps sequencefiles) at least that can be used without writing code by lay users.
          • as an aside - i don't understand now why sorting racks/nodes by number of blocks matters at all. for each rack/node - one would coalesce blocks into splits. what overflows goes into micellaneous bucket. this protocol does not depend on walking through the racks/nodes in a particular order. what seems more important is that overflow blocks are first combined by rack (but i am confused about the whole rack vs. node thing)
          Show
          Joydeep Sen Sarma added a comment - a few other comments: do we think this patch totally supersedes multifileinputformat/multifilesplit? if not - should CombineFileSplit extend MultiFileSplit? (the argument being that in that case CombineFileRecordReader can work for both MultiFileSplit and CombineFileSplit). In general - this is not going to be the last implementation of a multifilesplit/format - so it would be good to have the surrounding classes (recordreaders etc.) be built in a way that more implementations of a multifilesplit can be easily accomodated. CombineFileInputFormat does not implement getRecordReader (throws an exception) - shouldn't it just be an abstract class then? one of the bigger problems with MultiFileInputFormat was the lack of concrete implementations. I think it just makes sense to provide a full implementation of combinefileinputformat for text files (and perhaps sequencefiles) at least that can be used without writing code by lay users. as an aside - i don't understand now why sorting racks/nodes by number of blocks matters at all. for each rack/node - one would coalesce blocks into splits. what overflows goes into micellaneous bucket. this protocol does not depend on walking through the racks/nodes in a particular order. what seems more important is that overflow blocks are first combined by rack (but i am confused about the whole rack vs. node thing)
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12394713/CombineMultiFile3.patch
          against trunk revision 720930.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 3 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          -1 findbugs. The patch appears to introduce 4 new Findbugs warnings.

          +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

          -1 core tests. The patch failed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12394713/CombineMultiFile3.patch against trunk revision 720930. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 4 new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3662/console This message is automatically generated.
          Hide
          Enis Soztutar added a comment -

          I agree with Joydeep that if this inputFormat is intended for handling large number of files, then the changes should be done in the MultiFileInputFormat itself, rather than introducing a new class. However a quick glance to the patch indicates that this inputformat works for files having more than one block.
          I guess we should better introduce the file[] -> blocks[] ->

          {hosts[], racks[]}

          -> splits[] logic in FileInputFormat and use this logic both in FileInputFormat and MultiFileInputFormat.

          I have opened an issue for a concrete MultiFileInputFormat implementation (HADOOP-4741), and attached a patch which I developed some time ago. HADOOP-4741 depends on this issue.

          Show
          Enis Soztutar added a comment - I agree with Joydeep that if this inputFormat is intended for handling large number of files, then the changes should be done in the MultiFileInputFormat itself, rather than introducing a new class. However a quick glance to the patch indicates that this inputformat works for files having more than one block. I guess we should better introduce the file[] -> blocks[] -> {hosts[], racks[]} -> splits[] logic in FileInputFormat and use this logic both in FileInputFormat and MultiFileInputFormat. I have opened an issue for a concrete MultiFileInputFormat implementation ( HADOOP-4741 ), and attached a patch which I developed some time ago. HADOOP-4741 depends on this issue.
          Hide
          dhruba borthakur added a comment -

          Incorporate review comments.

          Show
          dhruba borthakur added a comment - Incorporate review comments.
          Hide
          dhruba borthakur added a comment -

          1. Inherit from MultiFileInputFormat.
          2. Do not combine spilled blocks with blocks from the next rack. Instead after all racks are processed, combine all spilled blocks into a single split
          3.Renamed nodesToBlocks to be racksToBlocks.
          4. CombineFileInputFormat is an abstract class. CombineFileRecordReader can be used as a RecordReader.
          5. The sorting is required because we want the algorithm to be capable of creating large splits. If block B1 is in racks R1, R2 and R3 and block B2 is in racks R1 and R4, then we want to process rack R1 before rack R4. If we process R1 first then block B1 and B2 can be combined into a single split. (Instead if we process rack R4, then one split will contain B2 while another split will have block B1.

          Show
          dhruba borthakur added a comment - 1. Inherit from MultiFileInputFormat. 2. Do not combine spilled blocks with blocks from the next rack. Instead after all racks are processed, combine all spilled blocks into a single split 3.Renamed nodesToBlocks to be racksToBlocks. 4. CombineFileInputFormat is an abstract class. CombineFileRecordReader can be used as a RecordReader. 5. The sorting is required because we want the algorithm to be capable of creating large splits. If block B1 is in racks R1, R2 and R3 and block B2 is in racks R1 and R4, then we want to process rack R1 before rack R4. If we process R1 first then block B1 and B2 can be combined into a single split. (Instead if we process rack R4, then one split will contain B2 while another split will have block B1.
          Hide
          dhruba borthakur added a comment -

          The earlier patch as many (or as few) splits in a single rack before moving to other racks. This caused the problem that most large splits were localised on the first few racks in the target set.

          In this patch, each iteration tries to create a split for rack. Once all racks are processed once, the loop starts all over again. This distributes the rack location of the splits created in a nice uniform manner.

          Show
          dhruba borthakur added a comment - The earlier patch as many (or as few) splits in a single rack before moving to other racks. This caused the problem that most large splits were localised on the first few racks in the target set. In this patch, each iteration tries to create a split for rack. Once all racks are processed once, the loop starts all over again. This distributes the rack location of the splits created in a nice uniform manner.
          Hide
          Joydeep Sen Sarma added a comment -

          Comments:

          • min/maxSplitSize - i found the initialization confusing. is it the intent that values supplied by setXXXSplitSize take precedence over config values? (I would find that logical - but the implementation seems different)
          • OneFileInfo - it seems to me that this class has no use except that it's constructor has a side-effect of populating some external structures. As a style issue - this seems unnecessary - we could just have a simple helper function instead.
          • for(boolean doIterate ..): i found this variable pretty confusing. could we just replace doIterator with blockToNodes.size()>0 ?
          • one of the comments from the last review was that CombineFileSplit would be nice to treat as an extension of MultiFileSplit. I would still make that argument - there should be a single base class/interface for these multisplit classes - because then the CombineFileRecordReader can be made to work with all of them.

          Looking at the apis - it actually seems that MultiFileSplit can perhaps be treated as a special implementation (extension) of CombineFileSplit (with get*offset functions always returning 0).

          Any thoughts on this?

          Not that familiar with miniMR stuff - would be good if someone else can review the test code as well ..

          Show
          Joydeep Sen Sarma added a comment - Comments: min/maxSplitSize - i found the initialization confusing. is it the intent that values supplied by setXXXSplitSize take precedence over config values? (I would find that logical - but the implementation seems different) OneFileInfo - it seems to me that this class has no use except that it's constructor has a side-effect of populating some external structures. As a style issue - this seems unnecessary - we could just have a simple helper function instead. for(boolean doIterate ..): i found this variable pretty confusing. could we just replace doIterator with blockToNodes.size()>0 ? one of the comments from the last review was that CombineFileSplit would be nice to treat as an extension of MultiFileSplit. I would still make that argument - there should be a single base class/interface for these multisplit classes - because then the CombineFileRecordReader can be made to work with all of them. Looking at the apis - it actually seems that MultiFileSplit can perhaps be treated as a special implementation (extension) of CombineFileSplit (with get*offset functions always returning 0). Any thoughts on this? Not that familiar with miniMR stuff - would be good if someone else can review the test code as well ..
          Hide
          dhruba borthakur added a comment -

          1. Made setXXXsize methods override the parameters specified in the config file. (the earlier version used to take the more stringent constraint if it was set in both the config file and via setXXXsize methods).
          2. I did not change OneFileInfo because it seems to encapsulate the information for each file. However, i cleaned up one unused field in this object. This is used only by the client at the time of creating splits.
          3. Removed the variable doIterate and replaced it to loop as long as there are blocks to process
          4. CombineFileSplit is now a superclass of MultiFileSplit. This allows CombineFileRecordReader to operate on MultiFileSplit as well.

          Show
          dhruba borthakur added a comment - 1. Made setXXXsize methods override the parameters specified in the config file. (the earlier version used to take the more stringent constraint if it was set in both the config file and via setXXXsize methods). 2. I did not change OneFileInfo because it seems to encapsulate the information for each file. However, i cleaned up one unused field in this object. This is used only by the client at the time of creating splits. 3. Removed the variable doIterate and replaced it to loop as long as there are blocks to process 4. CombineFileSplit is now a superclass of MultiFileSplit. This allows CombineFileRecordReader to operate on MultiFileSplit as well.
          Hide
          dhruba borthakur added a comment -

          [exec] +1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] +1 tests included. The patch appears to include 3 new or modified tests.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
          [exec]
          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
          [exec]
          [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
          [exec]

          Show
          dhruba borthakur added a comment - [exec] +1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. [exec]
          Hide
          dhruba borthakur added a comment -

          It would be nice if Enis can review this as well because this patch modified MultiFileSplit as well.

          Show
          dhruba borthakur added a comment - It would be nice if Enis can review this as well because this patch modified MultiFileSplit as well.
          Hide
          Joydeep Sen Sarma added a comment -

          sorry for bringing this up so late - but should we be concerned that we are going to only use rack locality?

          the reason i am bringing this up is that from a usage point of view - this may complicate how we use this. ie. - ideally we would just always use the CMFIF with some target split sizes - but now there is a question in my head on whether we should use CMFIF only in more extreme cases (like too many small files or too much data and high selectivity) - because otherwise we might use node locality for not much gain (since in majority of cases - the current inputformats are just fine)

          thoughts?

          (still making my way through the code one last time).

          Show
          Joydeep Sen Sarma added a comment - sorry for bringing this up so late - but should we be concerned that we are going to only use rack locality? the reason i am bringing this up is that from a usage point of view - this may complicate how we use this. ie. - ideally we would just always use the CMFIF with some target split sizes - but now there is a question in my head on whether we should use CMFIF only in more extreme cases (like too many small files or too much data and high selectivity) - because otherwise we might use node locality for not much gain (since in majority of cases - the current inputformats are just fine) thoughts? (still making my way through the code one last time).
          Hide
          Zheng Shao added a comment -

          Nitpick:

          CombineFileRecordReader seems to contain an unnecessary recursive call, which can be easily eliminated.

            public boolean next(K key, V value) throws IOException {
              while ((curReader == null) || !curReader.next(key, value)) {
                if (!initNextRecordReader()) {
                  return false;
                }
              }
              return true;
            }
          

          I am OK with that we only use rack locality. I think most statistics have shown that intra-rack communication is nearly free, especially when we are reading big blocks of data.

          Show
          Zheng Shao added a comment - Nitpick: CombineFileRecordReader seems to contain an unnecessary recursive call, which can be easily eliminated. public boolean next(K key, V value) throws IOException { while ((curReader == null ) || !curReader.next(key, value)) { if (!initNextRecordReader()) { return false ; } } return true ; } I am OK with that we only use rack locality. I think most statistics have shown that intra-rack communication is nearly free, especially when we are reading big blocks of data.
          Hide
          Joydeep Sen Sarma added a comment -

          the file size comment is interesting. part of the reason why we are interested in this patch is because of cases where there are lots of small files. So this actually would highlight the need to preserve node locality. (also matei reported a pretty significant difference in node/rack locality on our test cluster - that's also troubling me - we can follow that up separately).

          also - the code is already structured in a way that makes node locality very easy i think (i would hazard that as we walk the blocks in a rack and organize them into splits - all we need to do is order them by hostname and we would be done). thoughts?

          if we can preserve node locality - then there is never any reason to use the base inputformats. we can always use the combineinputformat and let it organize splits optimally. otherwise not having node locality is something we are always going to struggle with.

          nit: CombineFileRecordReader - rrConstructor.

          + static final Class [] constructorSignature = new Class []

          {InputSplit.class, Configuration.class, Reporter.class, Integer.class}

          ;

          can be tightened to:

          + static final Class [] constructorSignature = new Class []

          {CombineFileSplit.class, Configuration.class, Reporter.class, Integer.class}

          ;

          looks good otherwise.

          Show
          Joydeep Sen Sarma added a comment - the file size comment is interesting. part of the reason why we are interested in this patch is because of cases where there are lots of small files. So this actually would highlight the need to preserve node locality. (also matei reported a pretty significant difference in node/rack locality on our test cluster - that's also troubling me - we can follow that up separately). also - the code is already structured in a way that makes node locality very easy i think (i would hazard that as we walk the blocks in a rack and organize them into splits - all we need to do is order them by hostname and we would be done). thoughts? if we can preserve node locality - then there is never any reason to use the base inputformats. we can always use the combineinputformat and let it organize splits optimally. otherwise not having node locality is something we are always going to struggle with. nit: CombineFileRecordReader - rrConstructor. + static final Class [] constructorSignature = new Class [] {InputSplit.class, Configuration.class, Reporter.class, Integer.class} ; can be tightened to: + static final Class [] constructorSignature = new Class [] {CombineFileSplit.class, Configuration.class, Reporter.class, Integer.class} ; looks good otherwise.
          Hide
          Enis Soztutar added a comment -

          My 2 cents :
          CombineFileRecordReader should have a javadoc explaining the basic use case.
          changes in MultiFileSplit seems fine.
          it would be better to use the o.a.h.mapred.lib package for this InputFormat.
          CombinedFileInputFormat makes MultiFileInputFormat obsolete, with little differences. We should deprecate MFIF and MFIS.

          if we can preserve node locality - then there is never any reason to use the base inputformats. we can always use the combineinputformat and let it organize splits optimally. otherwise not having node locality is something we are always going to struggle with.

          In it's current form, FileInputFormat does not return splits larger than block size, unless we set mapred.min.split.size. However when it does, it could benefit from node/rack locality.

          Show
          Enis Soztutar added a comment - My 2 cents : CombineFileRecordReader should have a javadoc explaining the basic use case. changes in MultiFileSplit seems fine. it would be better to use the o.a.h.mapred.lib package for this InputFormat. CombinedFileInputFormat makes MultiFileInputFormat obsolete, with little differences. We should deprecate MFIF and MFIS. if we can preserve node locality - then there is never any reason to use the base inputformats. we can always use the combineinputformat and let it organize splits optimally. otherwise not having node locality is something we are always going to struggle with. In it's current form, FileInputFormat does not return splits larger than block size, unless we set mapred.min.split.size. However when it does, it could benefit from node/rack locality.
          Hide
          dhruba borthakur added a comment -

          Incorprates all review comments.

          @Zheng: I removed the recursion. Can you pl review this method once again? Thanks.
          @Joydeep: I changed the definition of the constructor. I also implemented code to try to first make splits that are node local (subject to maxSplitSize). Once this is done, then all remaining blocks are combined to create rack-local splits. The idea behind this is that if you specify a maxSplitSize to be the block size, then you practically get the existing defalt behaviour for all node-local data.
          @Enis: I moved the new files to mapred.lib, added JavaDocs

          Show
          dhruba borthakur added a comment - Incorprates all review comments. @Zheng: I removed the recursion. Can you pl review this method once again? Thanks. @Joydeep: I changed the definition of the constructor. I also implemented code to try to first make splits that are node local (subject to maxSplitSize). Once this is done, then all remaining blocks are combined to create rack-local splits. The idea behind this is that if you specify a maxSplitSize to be the block size, then you practically get the existing defalt behaviour for all node-local data. @Enis: I moved the new files to mapred.lib, added JavaDocs
          Hide
          Tom White added a comment -

          Do you know if this new input format will work well with Hadoop Archives (HAR files)? I wonder because the main use for HARs is to pack lots of small files into a few HDFS files, to reduce memory resource pressure on the namenode, so it seems like a good candidate for combining multiple files into splits.

          I had a quick look at the patch and had one comment (sorry to come to this late). The test should use asserts to check that the split locations are as expected rather than printing to System.out.

          Show
          Tom White added a comment - Do you know if this new input format will work well with Hadoop Archives (HAR files)? I wonder because the main use for HARs is to pack lots of small files into a few HDFS files, to reduce memory resource pressure on the namenode, so it seems like a good candidate for combining multiple files into splits. I had a quick look at the patch and had one comment (sorry to come to this late). The test should use asserts to check that the split locations are as expected rather than printing to System.out.
          Hide
          Joydeep Sen Sarma added a comment -

          looks pretty good to me.

          One question about the use of min and max. The logic for splitting at rack locality level makes sense - if rack Total < max but > min - then there's a split.

          but the logic for splitting by node doesn't have this logic. it always looks for > max.

          I think what might make sense is to have three thresholds:

          • minNodeSplit
          • minRackSplit
          • maxSplit

          meaning that anything more than minNodeSplit causes split at node level (before combining with next node), anything more than minRackSplit causes split at rack level (before combining across racks) and we never go beyond maxSplit ever. These three are likely to be strictly ordered (a smaller split is bad - but is worth it to maximize locality and node locality is worth more than rack locality).

          i don't know how we would come up with these numbers though!

          One small simplification - i think maxSize = 0 is essentially being treated as infinity. Setting maxSize to MAXINT if it is equal to 0 will simplify the logic in some of the iterators.

          Show
          Joydeep Sen Sarma added a comment - looks pretty good to me. One question about the use of min and max. The logic for splitting at rack locality level makes sense - if rack Total < max but > min - then there's a split. but the logic for splitting by node doesn't have this logic. it always looks for > max. I think what might make sense is to have three thresholds: minNodeSplit minRackSplit maxSplit meaning that anything more than minNodeSplit causes split at node level (before combining with next node), anything more than minRackSplit causes split at rack level (before combining across racks) and we never go beyond maxSplit ever. These three are likely to be strictly ordered (a smaller split is bad - but is worth it to maximize locality and node locality is worth more than rack locality). i don't know how we would come up with these numbers though! One small simplification - i think maxSize = 0 is essentially being treated as infinity. Setting maxSize to MAXINT if it is equal to 0 will simplify the logic in some of the iterators.
          Hide
          dhruba borthakur added a comment -

          Incorporated Joydeep's review comments.

          Show
          dhruba borthakur added a comment - Incorporated Joydeep's review comments.
          Hide
          dhruba borthakur added a comment -

          I implemented three thresholds as Joydeep suggested.

          @Tom: the System.out statements in the unit test is only to help debugging. The real check is already there in the assert() statements. Also, I do not know what it takes to make it work for HAR files. Do you have a specific test in mind?

          Show
          dhruba borthakur added a comment - I implemented three thresholds as Joydeep suggested. @Tom: the System.out statements in the unit test is only to help debugging. The real check is already there in the assert() statements. Also, I do not know what it takes to make it work for HAR files. Do you have a specific test in mind?
          Hide
          Joydeep Sen Sarma added a comment -

          +1 from my side.

          Show
          Joydeep Sen Sarma added a comment - +1 from my side.
          Hide
          Tom White added a comment -

          Dhruba,

          I was thinking that the asserts may need strengthening. For example, in the following code (and other similar places) we should assert that the splits have the expected paths and locations.

          // make sure that each split has different locations
          for (int i = 0; i < splits.length; ++i) {
            CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
            System.out.println("File split(Test1): " + fileSplit);
          }
          assertEquals(splits.length, 2);
          

          Regarding HARs, I don't have a particular test in mind. We probably need a special input format for HARs - that would be a separate issue.

          Show
          Tom White added a comment - Dhruba, I was thinking that the asserts may need strengthening. For example, in the following code (and other similar places) we should assert that the splits have the expected paths and locations. // make sure that each split has different locations for ( int i = 0; i < splits.length; ++i) { CombineFileSplit fileSplit = (CombineFileSplit) splits[i]; System .out.println( "File split(Test1): " + fileSplit); } assertEquals(splits.length, 2); Regarding HARs, I don't have a particular test in mind. We probably need a special input format for HARs - that would be a separate issue.
          Hide
          Zheng Shao added a comment -

          Thanks Dhruba!

          Committed revision 740402 for branch 0.20.
          Committed revision 740404 for trunk.

          Show
          Zheng Shao added a comment - Thanks Dhruba! Committed revision 740402 for branch 0.20. Committed revision 740404 for trunk.
          Hide
          dhruba borthakur added a comment -

          I will post an enhanced version of the unit test as requested by Tom.

          Show
          dhruba borthakur added a comment - I will post an enhanced version of the unit test as requested by Tom.
          Hide
          dhruba borthakur added a comment -

          1. Added lots of asserts to the unit test.
          2. Made the CombineFileSplit.toString() better formatted.
          3. Enhanced the TestCombineFileInputFormat.main() so that it can run as a standalone program against a real cluster.

          Show
          dhruba borthakur added a comment - 1. Added lots of asserts to the unit test. 2. Made the CombineFileSplit.toString() better formatted. 3. Enhanced the TestCombineFileInputFormat.main() so that it can run as a standalone program against a real cluster.
          Hide
          dhruba borthakur added a comment -

          [exec]
          [exec] +1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] +1 tests included. The patch appears to include 3 new or modified tests.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
          [exec]
          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
          [exec]
          [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
          [exec]
          [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings.

          Show
          dhruba borthakur added a comment - [exec] [exec] +1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. [exec] [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings.
          Hide
          dhruba borthakur added a comment -

          Tom: would appreciate it a lot if u can review the new patch that enhances the unit test.

          Show
          dhruba borthakur added a comment - Tom: would appreciate it a lot if u can review the new patch that enhances the unit test.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12399403/TestCombine.txt
          against trunk revision 740532.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 3 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12399403/TestCombine.txt against trunk revision 740532. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3795/console This message is automatically generated.
          Hide
          Tom White added a comment -

          +1

          The enhanced test looks good. (One minor comment, it could be made more concise by introducing a method to check a split that takes a split and its expected properties. But that's not absolutely necessary.) Thanks for writing it Dhruba.

          Show
          Tom White added a comment - +1 The enhanced test looks good. (One minor comment, it could be made more concise by introducing a method to check a split that takes a split and its expected properties. But that's not absolutely necessary.) Thanks for writing it Dhruba.
          Hide
          dhruba borthakur added a comment -

          I just committed this.

          Show
          dhruba borthakur added a comment - I just committed this.
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #756 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/756/ )
          Hide
          Robert Chansler added a comment -

          Edit release notes for publication.

          Show
          Robert Chansler added a comment - Edit release notes for publication.
          Hide
          Gaurav Jain added a comment -

          Did you happen to hit the following issue after this jiras changes?

          https://issues.apache.org/jira/browse/HDFS-347

          Looks like I am hitting this issue. I am running some more benchmarks to trace down the details.

          Show
          Gaurav Jain added a comment - Did you happen to hit the following issue after this jiras changes? https://issues.apache.org/jira/browse/HDFS-347 Looks like I am hitting this issue. I am running some more benchmarks to trace down the details.

            People

            • Assignee:
              dhruba borthakur
              Reporter:
              dhruba borthakur
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development