Pig
  1. Pig
  2. PIG-1518

multi file input format for loaders

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Hide
      Feature: combine splits of sizes smaller than the value of property "pig.maxCombinedSplitSize" or, if the property of "pig.maxCombinedSplitSize" is not set, the file system default block size of the load's location. This feature can be turned off through setting the property "pig.splitCombination" to "false". When such a combination is performed, a log message like "Total input paths (combined) to process : 7" will be logged.

      This feature will be applicable if a user input, or an intermediate input, has many small files to be loaded that would otherwise cause many more "under-fed" mappers to be launched and potentially slowdown of the execution.

      This change will not cause any backward compatibility issue except if a loader implementation makes use of the PigSplit object passed through the prepareToRead method where a rebuild of the loader might be necessary as PigSplit's definition has been modified. However, currently we know of no external use of the object.

      This change also requires the loader to be stateless across the invocations to the prepareToRead method. That is, the method should reset any internal states that are not affected by the RecordReader argument.
      Otherwise, this feature should be disabled.

      In addition, if a loader implements IndexableLoadFunc, or implements OrderedLoadFunc and CollectableLoadFunc, its input splits won't be subject to possible combinations.
      Show
      Feature: combine splits of sizes smaller than the value of property "pig.maxCombinedSplitSize" or, if the property of "pig.maxCombinedSplitSize" is not set, the file system default block size of the load's location. This feature can be turned off through setting the property "pig.splitCombination" to "false". When such a combination is performed, a log message like "Total input paths (combined) to process : 7" will be logged. This feature will be applicable if a user input, or an intermediate input, has many small files to be loaded that would otherwise cause many more "under-fed" mappers to be launched and potentially slowdown of the execution. This change will not cause any backward compatibility issue except if a loader implementation makes use of the PigSplit object passed through the prepareToRead method where a rebuild of the loader might be necessary as PigSplit's definition has been modified. However, currently we know of no external use of the object. This change also requires the loader to be stateless across the invocations to the prepareToRead method. That is, the method should reset any internal states that are not affected by the RecordReader argument. Otherwise, this feature should be disabled. In addition, if a loader implements IndexableLoadFunc, or implements OrderedLoadFunc and CollectableLoadFunc, its input splits won't be subject to possible combinations.

      Description

      We frequently run in the situation where Pig needs to deal with small files in the input. In this case a separate map is created for each file which could be very inefficient.

      It would be greate to have an umbrella input format that can take multiple files and use them in a single split. We would like to see this working with different data formats if possible.

      There are already a couple of input formats doing similar thing: MultifileInputFormat as well as CombinedInputFormat; howevere, neither works with ne Hadoop 20 API.

      We at least want to do a feasibility study for Pig 0.8.0.

      1. PIG-1518-0.7.0.patch
        57 kB
        Justin Sanders
      2. PIG-1518.patch
        58 kB
        Yan Zhou
      3. PIG-1518.patch
        58 kB
        Yan Zhou
      4. PIG-1518.patch
        58 kB
        Yan Zhou
      5. PIG-1518.patch
        58 kB
        Yan Zhou
      6. PIG-1518.patch
        58 kB
        Yan Zhou
      7. PIG-1518.patch
        58 kB
        Yan Zhou
      8. PIG-1518.patch
        58 kB
        Yan Zhou
      9. PIG-1518.patch
        52 kB
        Yan Zhou

        Issue Links

          Activity

          Olga Natkovich created issue -
          Hide
          Yan Zhou added a comment -

          CombinedInputFormat, in lieu of the deprecated MultiFileInputFomrat, batches small files on the basis of block locality. For PIG, this umbrella input format will have to work with the generic input formats for which the block info is not available but the data node and size info are present to let the M/R make scheduling decisions.

          CombinedInputFormat, in lieu of the deprecated MultiFileInputFomrat, batches small files on the basis of block locality. For PIG, this umbrella input format will have to work with the generic input formats for which the block info is unavailable but the data node and size info are present to let the M/R make scheduling decisions. In other words, PIG can not
          break the original splits to "work inside" but can just use the original splits as building block for the combined input splits.

          Consequently, this combine input format will be holding multiple generic input splits so that each combined split's size is bound by a configured limit of, say, pig.maxsplitsize, with the default value of the HDFS block size of the file system the load source sits in.

          However, due to the constrains of sortness in the tables in merge join, the split combination will not be used for any loads that will be used in merge join. For mapside cogroup or mapside group by, though, the splits can be combined because the splits are only required to contain the all duplicate keys per instance and combination of splits will still preserve that invariant.

          During combination, the splits on the same data nodes will be merged as much as possible. Leftovers will be merged without regarding to the data localities. Of all the used data nodes, those of less splits will be merged before considering those of more splits so as to minimize the leftovers on the data nodes of less splits. On each data node, a greedy approach is adopted so that largest splits are tried to be merged before smaller ones. This is because smaller splits are easier merged later among themselves.
          As result, in implementation, a sorted list of data hosts (on the number of splits) of sorted lists (on the split size) of the original splits will be maintained to efficiently perform the above operations. The complexity should be linear with the number of the original splits.

          Note that for data locality, we just honor whatever the generic input split's getLocations() method produces. Any particular input split's implementation actually may or may not hold that property. For instance, CombinedInputFormat will combine
          node-local or rack-local blocks into a split. Essentially, this PIG container input split works on whatever data locality perception the underlying loader provides.

          On the implementation side, PigSplit will not hold a single wrapped InputSplit instance but a new CombinedInputSplit instance. Accordingly, PigRecordReader will hold a list
          of wrapped record readers and not just a single one. Correspondingly PigRecordReader's nextKeyValue() will use the wrapped record reader in order to fetch the next values.

          Risks include 1) the test verifications may need major changes since this optimization may cause major ordering changes in results; 2) since LoadFunc.prepareRead() takes a PigSplit argument, there might be a backward compatibility issue as PigSplit changes its wrapped input split to the combined input split. But this should be very unlikely as the only known
          use of the PigSplit argument is the internal "index loader" for the right table in merge join.

          Show
          Yan Zhou added a comment - CombinedInputFormat, in lieu of the deprecated MultiFileInputFomrat, batches small files on the basis of block locality. For PIG, this umbrella input format will have to work with the generic input formats for which the block info is not available but the data node and size info are present to let the M/R make scheduling decisions. CombinedInputFormat, in lieu of the deprecated MultiFileInputFomrat, batches small files on the basis of block locality. For PIG, this umbrella input format will have to work with the generic input formats for which the block info is unavailable but the data node and size info are present to let the M/R make scheduling decisions. In other words, PIG can not break the original splits to "work inside" but can just use the original splits as building block for the combined input splits. Consequently, this combine input format will be holding multiple generic input splits so that each combined split's size is bound by a configured limit of, say, pig.maxsplitsize, with the default value of the HDFS block size of the file system the load source sits in. However, due to the constrains of sortness in the tables in merge join, the split combination will not be used for any loads that will be used in merge join. For mapside cogroup or mapside group by, though, the splits can be combined because the splits are only required to contain the all duplicate keys per instance and combination of splits will still preserve that invariant. During combination, the splits on the same data nodes will be merged as much as possible. Leftovers will be merged without regarding to the data localities. Of all the used data nodes, those of less splits will be merged before considering those of more splits so as to minimize the leftovers on the data nodes of less splits. On each data node, a greedy approach is adopted so that largest splits are tried to be merged before smaller ones. This is because smaller splits are easier merged later among themselves. As result, in implementation, a sorted list of data hosts (on the number of splits) of sorted lists (on the split size) of the original splits will be maintained to efficiently perform the above operations. The complexity should be linear with the number of the original splits. Note that for data locality, we just honor whatever the generic input split's getLocations() method produces. Any particular input split's implementation actually may or may not hold that property. For instance, CombinedInputFormat will combine node-local or rack-local blocks into a split. Essentially, this PIG container input split works on whatever data locality perception the underlying loader provides. On the implementation side, PigSplit will not hold a single wrapped InputSplit instance but a new CombinedInputSplit instance. Accordingly, PigRecordReader will hold a list of wrapped record readers and not just a single one. Correspondingly PigRecordReader's nextKeyValue() will use the wrapped record reader in order to fetch the next values. Risks include 1) the test verifications may need major changes since this optimization may cause major ordering changes in results; 2) since LoadFunc.prepareRead() takes a PigSplit argument, there might be a backward compatibility issue as PigSplit changes its wrapped input split to the combined input split. But this should be very unlikely as the only known use of the PigSplit argument is the internal "index loader" for the right table in merge join.
          Hide
          Yan Zhou added a comment -

          In contrast with Hive, where the CombineFileInputFormat is used to generate input splits on the underlying storage formats, this PIG's combined splits work on top of the splits generated by the underlying loaders. In other words, Hive's input splits are CombineFileSplits that create record readers of underlying storage formats; while Pig's combined input splits contain underlying storage's splits.

          CombineFileRecordReader would have been reusable if not for its support only in 0.18 and the need of CombineFIleSplit as an argument to its constructor instead of InputSplit (MAPREDUCE-955).

          Show
          Yan Zhou added a comment - In contrast with Hive, where the CombineFileInputFormat is used to generate input splits on the underlying storage formats, this PIG's combined splits work on top of the splits generated by the underlying loaders. In other words, Hive's input splits are CombineFileSplits that create record readers of underlying storage formats; while Pig's combined input splits contain underlying storage's splits. CombineFileRecordReader would have been reusable if not for its support only in 0.18 and the need of CombineFIleSplit as an argument to its constructor instead of InputSplit ( MAPREDUCE-955 ).
          Hide
          Yan Zhou added a comment -

          The combination algorithm currently does not consider rack-locality as the generic underlying input splits do not carry the rack info. For more specific input splits like FileSplit, the rack info is available, thus allowing for generation of combined splits with consideration of rack-locality. But this might be out of scope for 0.8 and a seperate JIRA, PIG-1535, has been filed for that purpose.

          Show
          Yan Zhou added a comment - The combination algorithm currently does not consider rack-locality as the generic underlying input splits do not carry the rack info. For more specific input splits like FileSplit, the rack info is available, thus allowing for generation of combined splits with consideration of rack-locality. But this might be out of scope for 0.8 and a seperate JIRA, PIG-1535 , has been filed for that purpose.
          Hide
          Yan Zhou added a comment -

          To provide a safe valve for any input fomats that might dislike the combination of their splits, a boolean property of pig.splitcombinaton is to be provided to allow for disabling this feature. The default value will be true.

          Show
          Yan Zhou added a comment - To provide a safe valve for any input fomats that might dislike the combination of their splits, a boolean property of pig.splitcombinaton is to be provided to allow for disabling this feature. The default value will be true.
          Hide
          Yan Zhou added a comment -

          The pseudo code of the combination op is as follows:

          for each node of the nodes (sorted in the order of ascending sizes) {
          while the node's split list (sorted in the order of descending sizes) is not empty {
          find the biggest splits that can be combined with the first split of the list of the splits;
          if the accumulated split size is >= half of the limit

          { generate a combined split; remove the accumulated splits from the node's split list; clear the accumulated split list; }

          else

          { break; }

          }
          }

          // leftover combination
          for each node of the nodes {
          for each split of the node's split list

          { add the split to a leftover list; }

          }

          for each split in the leftover list {
          if accumulated split size is >= limit

          { generate a combined split; remove the accumulated splits from the node's split list; clear the accumulated split list; }

          if it is the last split in the leftover list

          { try to see if it can be added with an existing combined split; if not, generate a combined split on the accumulated splits; }

          }

          The complexity is n*log with n being the number of original splits that are smaller than the limit.

          Show
          Yan Zhou added a comment - The pseudo code of the combination op is as follows: for each node of the nodes (sorted in the order of ascending sizes) { while the node's split list (sorted in the order of descending sizes) is not empty { find the biggest splits that can be combined with the first split of the list of the splits; if the accumulated split size is >= half of the limit { generate a combined split; remove the accumulated splits from the node's split list; clear the accumulated split list; } else { break; } } } // leftover combination for each node of the nodes { for each split of the node's split list { add the split to a leftover list; } } for each split in the leftover list { if accumulated split size is >= limit { generate a combined split; remove the accumulated splits from the node's split list; clear the accumulated split list; } if it is the last split in the leftover list { try to see if it can be added with an existing combined split; if not, generate a combined split on the accumulated splits; } } The complexity is n*log with n being the number of original splits that are smaller than the limit.
          Hide
          Alan Gates added a comment -

          For mapside cogroup or mapside group by, though, the splits can be combined because the splits are only required to contain the all duplicate keys per instance and combination of splits will still preserve that invariant.

          You are correct for mapside group, but not mapside cogroup. Mapside cogroup does require all files being grouped to be processed in an ordered fashion.

          Show
          Alan Gates added a comment - For mapside cogroup or mapside group by, though, the splits can be combined because the splits are only required to contain the all duplicate keys per instance and combination of splits will still preserve that invariant. You are correct for mapside group, but not mapside cogroup. Mapside cogroup does require all files being grouped to be processed in an ordered fashion.
          Hide
          Yan Zhou added a comment -

          Right, map side cogroup needs the sortness of the input, but just the "side inputs" need the feature to be able to seek on a key; the "base input" will only need presence of all duplicate keys in a mapper. I'll mark the "side inputs" as non-combinable.

          Show
          Yan Zhou added a comment - Right, map side cogroup needs the sortness of the input, but just the "side inputs" need the feature to be able to seek on a key; the "base input" will only need presence of all duplicate keys in a mapper. I'll mark the "side inputs" as non-combinable.
          Hide
          Yan Zhou added a comment -

          During the merge process, any empty splits will be skipped. Currently empty splits will be generated on empty files, which is not necessary at the first place.

          Show
          Yan Zhou added a comment - During the merge process, any empty splits will be skipped. Currently empty splits will be generated on empty files, which is not necessary at the first place.
          Hide
          Yan Zhou added a comment -

          There is a bigger question at hand. The semantics of OrderedLoadFunc is that the splits are totally ordered. And BinStorage, InterStorage and PigStorage all implement that interface through FileInputLoadFunc. Since the combination of splits as conceived here will definitely destroy the split ordering, if the combination is disabled for these storages, the feature would be virtually useless for a majority of use cases.

          On the other hand, I'm seeing no use of the comparison capability except for MergeJoinIndexer's getNext() method, which makes me wonder if the OrderedLoadFunc can be removed from the FileInputLoadFunc. Semantically, FileInputLoadFunc should not support the ordering of splits, as Hadoop's FileInputFormat doesn't. When a need arises like in MergeJoinIndexer, we can add that extension on. But the change may incur some backward compatibility issues.
          I'm now soliciting comments in this area.

          Show
          Yan Zhou added a comment - There is a bigger question at hand. The semantics of OrderedLoadFunc is that the splits are totally ordered. And BinStorage, InterStorage and PigStorage all implement that interface through FileInputLoadFunc. Since the combination of splits as conceived here will definitely destroy the split ordering, if the combination is disabled for these storages, the feature would be virtually useless for a majority of use cases. On the other hand, I'm seeing no use of the comparison capability except for MergeJoinIndexer's getNext() method, which makes me wonder if the OrderedLoadFunc can be removed from the FileInputLoadFunc. Semantically, FileInputLoadFunc should not support the ordering of splits, as Hadoop's FileInputFormat doesn't. When a need arises like in MergeJoinIndexer, we can add that extension on. But the change may incur some backward compatibility issues. I'm now soliciting comments in this area.
          Hide
          Ashutosh Chauhan added a comment -

          This feature of combining multiple splits should honor OrderedLoadFunc interface. If loadfunc is implementing that interface, then splits generated by it should not be combined. However, its not clear why FileInputLoadFunc implements this interface. AFAIK, split[] returned by getsplits() on FileInputFormat makes no guarantees that underlying splits will be returned in ordered fashion. Though, it is a default behavior right now and thus making it implement OrderedLoadFunc doesnt result in any problem in current implementation. But it seems there is no real benefit of FileInputLoadFunc needing to implement it (there is one exception to which I will come later on). So, I will argue that FileInputLoadFunc stop implementing OrderedLoadFunc. This will result in immediate benefit of making this change useful to all the fundamental storage mechanisms of Pig like PigStorage, BinStorage, InterStorage etc. Dropping of an interface by an implementing class can be seen as backward incompatible change, but I really doubt if any one cares if PigStorage is reading splits in an ordered fashion.
          Only real victim of this change will be MergeJoin which will stop working with PigStorage by default. But we have not seen MergeJoin being used with PigStorage at many places. Second, its anyway is based on assumption of FileInputFormat which may choose to change behavior in future. Third, solution of this problem will be straight forward that having other Loader which extends PigStorage and implements OrderedLoadFunc which can be used to load data for merge join.

          In essence I am arguing to drop OrderedLoadFunc interface from FileInputLoadFunc so that this feature is useful for large number of usecases.

          Yan, you also need to watch out for ReadToEndLoader which is also making assumptions which may break in presence of this feature.

          Show
          Ashutosh Chauhan added a comment - This feature of combining multiple splits should honor OrderedLoadFunc interface. If loadfunc is implementing that interface, then splits generated by it should not be combined. However, its not clear why FileInputLoadFunc implements this interface. AFAIK, split[] returned by getsplits() on FileInputFormat makes no guarantees that underlying splits will be returned in ordered fashion. Though, it is a default behavior right now and thus making it implement OrderedLoadFunc doesnt result in any problem in current implementation. But it seems there is no real benefit of FileInputLoadFunc needing to implement it (there is one exception to which I will come later on). So, I will argue that FileInputLoadFunc stop implementing OrderedLoadFunc. This will result in immediate benefit of making this change useful to all the fundamental storage mechanisms of Pig like PigStorage, BinStorage, InterStorage etc. Dropping of an interface by an implementing class can be seen as backward incompatible change, but I really doubt if any one cares if PigStorage is reading splits in an ordered fashion. Only real victim of this change will be MergeJoin which will stop working with PigStorage by default. But we have not seen MergeJoin being used with PigStorage at many places. Second, its anyway is based on assumption of FileInputFormat which may choose to change behavior in future. Third, solution of this problem will be straight forward that having other Loader which extends PigStorage and implements OrderedLoadFunc which can be used to load data for merge join. In essence I am arguing to drop OrderedLoadFunc interface from FileInputLoadFunc so that this feature is useful for large number of usecases. Yan, you also need to watch out for ReadToEndLoader which is also making assumptions which may break in presence of this feature.
          Hide
          Yan Zhou added a comment -

          Another approach is to mark splits as uncombinable only when necessary. Specifically, MergeJoinIndexer and the base load in mapside cogroup need to be excluded from the split combination.

          Breaking backward compatinility is probably too much a risk to take. In the meanwhile, OrderedLoadFunc has a notion of "being evolving" that will leave some headroom for future semantic polishes.

          Show
          Yan Zhou added a comment - Another approach is to mark splits as uncombinable only when necessary. Specifically, MergeJoinIndexer and the base load in mapside cogroup need to be excluded from the split combination. Breaking backward compatinility is probably too much a risk to take. In the meanwhile, OrderedLoadFunc has a notion of "being evolving" that will leave some headroom for future semantic polishes.
          Hide
          Yan Zhou added a comment -

          One experimental result on a 15-node cluster of 2 x Xeon L5420 2.50GHz/16G RAM boxes is as follows:

          Query:

          register pigperf.jar;
          A = load '/user/pig/tests/data/pigmix/page_views' using org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
          as (user, action, timespent, query_term, ip_addr, timestamp,
          estimated_revenue, page_info, page_links);
          B = foreach A generate user, (double)estimated_revenue;
          B1 = distinct B;
          alpha = load '/user/pig/tests/data/pigmix/users' using PigStorage('\u0001') as (name, phone, address,
          city, state, zip);
          beta = foreach alpha generate name;
          C = join beta by name, B1 by user parallel 300;
          D = group C by $0 parallel 40;
          E = foreach D generate group, SUM(C.estimated_revenue);
          store E into 'spliCombo2.out';

          It creates 3 map/reduce jobs.

          No Split Combination:

          Mappers Reducers
          number 120 300
          elapsed time 24s 2m43s
          number 301 300
          elapsed time 46s 3m11s
          number 300 40
          elapsed time 38s 53s
          Total elapsed time 7m36s

          With Split Combination:

          mappers Reducers
          number 120 300
          elapsed time 22s 2m49s
          number 3 300
          elapsed time 27s 2m46s
          number 1 40
          elapsed time 17s 24s
          Total elapsed time 7m5s
          Show
          Yan Zhou added a comment - One experimental result on a 15-node cluster of 2 x Xeon L5420 2.50GHz/16G RAM boxes is as follows: Query: register pigperf.jar; A = load '/user/pig/tests/data/pigmix/page_views' using org.apache.pig.test.udf.storefunc.PigPerformanceLoader() as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links); B = foreach A generate user, (double)estimated_revenue; B1 = distinct B; alpha = load '/user/pig/tests/data/pigmix/users' using PigStorage('\u0001') as (name, phone, address, city, state, zip); beta = foreach alpha generate name; C = join beta by name, B1 by user parallel 300; D = group C by $0 parallel 40; E = foreach D generate group, SUM(C.estimated_revenue); store E into 'spliCombo2.out'; It creates 3 map/reduce jobs. No Split Combination: Mappers Reducers number 120 300 elapsed time 24s 2m43s number 301 300 elapsed time 46s 3m11s number 300 40 elapsed time 38s 53s Total elapsed time 7m36s With Split Combination: mappers Reducers number 120 300 elapsed time 22s 2m49s number 3 300 elapsed time 27s 2m46s number 1 40 elapsed time 17s 24s Total elapsed time 7m5s
          Hide
          Yan Zhou added a comment -

          The formatting of the table of the last comment is a bit off: both headers should be be right-shifted by one column.

          Show
          Yan Zhou added a comment - The formatting of the table of the last comment is a bit off: both headers should be be right-shifted by one column.
          Yan Zhou made changes -
          Field Original Value New Value
          Attachment PIG-1518.patch [ 12452408 ]
          Hide
          Yan Zhou added a comment -

          In summary, the split combination's controllables are through the following jvm properties:

          pig.maxCombinedSplitSize: by default, it is the load filesystem's default block size. This specifies the maximum combined split size in unit of bytes;

          pig.splitCombination: takes values of "false" and "true". The default is "true". "false" will disable the split combination.

          Show
          Yan Zhou added a comment - In summary, the split combination's controllables are through the following jvm properties: pig.maxCombinedSplitSize: by default, it is the load filesystem's default block size. This specifies the maximum combined split size in unit of bytes; pig.splitCombination: takes values of "false" and "true". The default is "true". "false" will disable the split combination.
          Hide
          Mridul Muralidharan added a comment -

          if optimizer is turned off, does this also get turned off ? (pig.splitCombination= false).

          Show
          Mridul Muralidharan added a comment - if optimizer is turned off, does this also get turned off ? (pig.splitCombination= false).
          Hide
          Yan Zhou added a comment -

          No. It does not work inside an optimizer as logical/physical plans are not changed as the other optimizers do.

          Show
          Yan Zhou added a comment - No. It does not work inside an optimizer as logical/physical plans are not changed as the other optimizers do.
          Hide
          Yan Zhou added a comment -

          Style changes, Hudson pass, plus other minor changes. Internal Hudson results:

          [exec] -1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] +1 tests included. The patch appears to include 3 new or modified tests.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
          [exec]
          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
          [exec]
          [exec] -1 release audit. The applied patch generated 427 release audit warnings (more than the trunk's current 425 warnings).

          The release audit warnings are on two html files: PigInputFormat.html and PiRecordReader.html

          Show
          Yan Zhou added a comment - Style changes, Hudson pass, plus other minor changes. Internal Hudson results: [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] -1 release audit. The applied patch generated 427 release audit warnings (more than the trunk's current 425 warnings). The release audit warnings are on two html files: PigInputFormat.html and PiRecordReader.html
          Yan Zhou made changes -
          Attachment PIG-1518.patch [ 12452679 ]
          Hide
          Richard Ding added a comment -

          +1. The patch looks good.

          A few of minor points:

          • In PigSplit, the method add(InputSplit split) is not used and can be removed
          • In MapRedUtil, it would be better to not leave the debug verification code in the source code
          • In PigRecordReader, the code can be simplified if the initNextRecordReader() from constructor to initialize() method
          Show
          Richard Ding added a comment - +1. The patch looks good. A few of minor points: In PigSplit, the method add(InputSplit split) is not used and can be removed In MapRedUtil, it would be better to not leave the debug verification code in the source code In PigRecordReader, the code can be simplified if the initNextRecordReader() from constructor to initialize() method
          Hide
          Yan Zhou added a comment -

          The add method if PigSplit is removed. The debug code is left to facilitate future debugging work. The use of initNextRecordReader is pretty cloned from org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader and I'll leave it as is too.

          Show
          Yan Zhou added a comment - The add method if PigSplit is removed. The debug code is left to facilitate future debugging work. The use of initNextRecordReader is pretty cloned from org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader and I'll leave it as is too.
          Yan Zhou made changes -
          Attachment PIG-1518.patch [ 12452873 ]
          Hide
          Yan Zhou added a comment -

          Fix a typo; rebase on the latest trunk.

          Show
          Yan Zhou added a comment - Fix a typo; rebase on the latest trunk.
          Yan Zhou made changes -
          Attachment PIG-1518.patch [ 12452879 ]
          Yan Zhou made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Release Note Feature: combine splits of sizes smaller than the value of property "pig.maxCombinedSplitSize" or, if the property of "pig.maxCombinedSplitSize" is not set, the file system default block size of the load's location. This feature can be turned off through setting the property "pig.noSplitCombination" to true. When such a combination is performed, a log message like "Total input paths (combined) to process : 7" will be logged.

          This feature will be applicable if a user input, or an intermediate input, has many small files to be loaded that would otherwise cause many more "under-fed" mappers to be launched and potentially slowdown of the execution.

          This change will not cause any backward compatibility issue except if a loader implementation makes use of the PigSplit object passed through the prepareToRead method where a rebuild of the loader might be necessary as PigSplit's definition has been modified. However, currently we know of no external use of the object.

          In addition, if a loader implements IndexableLoadFunc, or implements OrderedLoadFunc and CollectableLoadFunc, its input splits won't be subject to possible combinations.
          Yan Zhou made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Yan Zhou added a comment -

          Minor polish of a debugging code inside comments

          Show
          Yan Zhou added a comment - Minor polish of a debugging code inside comments
          Yan Zhou made changes -
          Attachment PIG-1518.patch [ 12453008 ]
          Yan Zhou made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Mridul Muralidharan added a comment -

          Might be a good idea to contact aruniyer who maintains the FISH implementation.
          It is essentially built upon pig split and custom loader.

          Show
          Mridul Muralidharan added a comment - Might be a good idea to contact aruniyer who maintains the FISH implementation. It is essentially built upon pig split and custom loader.
          Yan Zhou made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Yan Zhou added a comment -

          Improvement on logging info.

          Show
          Yan Zhou added a comment - Improvement on logging info.
          Yan Zhou made changes -
          Attachment PIG-1518.patch [ 12453092 ]
          Yan Zhou made changes -
          Attachment PIG-1518.patch [ 12453135 ]
          Hide
          Yan Zhou added a comment -

          rebased on the latest trunk

          Show
          Yan Zhou added a comment - rebased on the latest trunk
          Yan Zhou made changes -
          Attachment PIG-1518.patch [ 12453150 ]
          Hide
          Richard Ding added a comment -

          Patch is committed to trunk. Thanks Yan.

          Show
          Richard Ding added a comment - Patch is committed to trunk. Thanks Yan.
          Richard Ding made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Hadoop Flags [Reviewed]
          Resolution Fixed [ 1 ]
          Hide
          Dmitriy V. Ryaboy added a comment -

          This is a great feature, thanks Yan.

          Could you comment on what the final solution was as far as PigStorage and OrderedLoadFunc? I see two ideas (yours and Ashutosh's) in the discussion, but not what the ultimate direction you took was.

          Show
          Dmitriy V. Ryaboy added a comment - This is a great feature, thanks Yan. Could you comment on what the final solution was as far as PigStorage and OrderedLoadFunc? I see two ideas (yours and Ashutosh's) in the discussion, but not what the ultimate direction you took was.
          Hide
          Yan Zhou added a comment -

          It is not combinable if the loader is a CollectableLoadFunc AND a OrderedLoadFunc. Since PigStorage is a CollectableLoadFunc but not a OrderedLoadFunc, it is combinable.

          Show
          Yan Zhou added a comment - It is not combinable if the loader is a CollectableLoadFunc AND a OrderedLoadFunc. Since PigStorage is a CollectableLoadFunc but not a OrderedLoadFunc, it is combinable.
          Hide
          Ashutosh Chauhan added a comment -

          Yan,
          Sorry for being late on this now thats its committed. But I think you have gotten it other way around. A CollectableLoadFunc is combinable but OrderedLoadFunc is not. Lets go over all three interfaces:

          • CollectableLoadFunc: A loader implementing it must make sure that all instances of a particular key is present in one split. If you combine splits of such a loader, it will still remain CollectableLoadFunc because all instances of keys will still be in same split after combination. It is dictating a property within a split. Thus, its combinable.

          • OrderedLoadFunc: OrderedLoadFunc insists that loader implementing it must read splits in a well defined order. If you combine the splits, that order may not hold. You cant combine splits for this loader. Its defining a property across multiple splits.

          • IndexableLoadFunc: Says that loader is indexable meaning given a key it will get you as close as possible to that key. It inherently assumes data is sorted and index is built for it. Your combined splits may not remain sorted anymore. You cant combine splits for this interface either. Its defining a property across multiple splits.

          If you agree with above then PigStorage isnt combinable because

          public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,  LoadPushDown{}
          and 
          public abstract class FileInputLoadFunc extends LoadFunc implements OrderedLoadFunc  {}
          

          I also didnt get your logic for CollectableLoadFunc AND a OrderedLoadFunc It will help if you can explain that a bit.

          Show
          Ashutosh Chauhan added a comment - Yan, Sorry for being late on this now thats its committed. But I think you have gotten it other way around. A CollectableLoadFunc is combinable but OrderedLoadFunc is not. Lets go over all three interfaces: CollectableLoadFunc: A loader implementing it must make sure that all instances of a particular key is present in one split. If you combine splits of such a loader, it will still remain CollectableLoadFunc because all instances of keys will still be in same split after combination. It is dictating a property within a split. Thus, its combinable. OrderedLoadFunc: OrderedLoadFunc insists that loader implementing it must read splits in a well defined order. If you combine the splits, that order may not hold. You cant combine splits for this loader. Its defining a property across multiple splits. IndexableLoadFunc: Says that loader is indexable meaning given a key it will get you as close as possible to that key. It inherently assumes data is sorted and index is built for it. Your combined splits may not remain sorted anymore. You cant combine splits for this interface either. Its defining a property across multiple splits. If you agree with above then PigStorage isnt combinable because public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadPushDown{} and public abstract class FileInputLoadFunc extends LoadFunc implements OrderedLoadFunc {} I also didnt get your logic for CollectableLoadFunc AND a OrderedLoadFunc It will help if you can explain that a bit.
          Hide
          Yan Zhou added a comment -

          MergeJoinIndexer and IndexableLoadFunc are both not combinable.

          Regarding orderedLoadFunc, the story is a bit more complex. First of all, it's only non-overriden method, getSplitComparable, is only used in MergeJoinIndexer which is already not combinable.

          The big issue is FileInputLoadFunc which is extended by BinStorage, PigStorage and InterStorage. Semantically, I agree OrderedLoadFunc should not be combinable. However, FileInputFormat's implementation of OrderedLoadFunc makes little sense in that its ordering is based on the (path, offset) pair. This is an ordering but just an arbitrary ordering. Mathematically one can establish any arbitrary ordering over a discrete set of data. But the point is how is the ordering used. For our purpose, the ordering should be related to some keys used in data manipulation for which (path, offset) does not serve the purpose. Or implicitly a FileInputLoadFunc still requires the storage gives out splits in some key ordering. If that storage ordering does not actually exist, FileInputLoadFunc as an OrderedLoadFunc will have no use of its "sortness"
          because the ordering is just, well, arbitray. The three extensions of FileInputLoadFunc work on generic data storage. Unless they work on sorted data in general, they should not be an OrderedLoadFunc.

          The other use of OrderedLoadFunc, not its non-overriden method, getSplitComparable, is by map-side cogroup. But it does not check if the sort key is the join key which is critical for correctness. It also requires to be a CollectableLoadFunc to work properly.

          Since we do not want to break backward compatibility, and the only use of OrderLoadFunc in Pig, except for MergeJinIndexer which is already excluded from combining, is in map side cogroup with CollectableLoadFunc, I mark "CollectableLoadFunc AND an OrderedLoadFunc" as non-combinable.

          In the future, we should really clean up the the OrderedLoadFunc from FileInputLoadFunc and let the getSplitComparable method provide key-related info and not the (path, offset) pair. Backward compatibility may need to be addressed too. Only then will the water become clearer and I be ok to adjust the noncombinable setting accordingly.

          Show
          Yan Zhou added a comment - MergeJoinIndexer and IndexableLoadFunc are both not combinable. Regarding orderedLoadFunc, the story is a bit more complex. First of all, it's only non-overriden method, getSplitComparable, is only used in MergeJoinIndexer which is already not combinable. The big issue is FileInputLoadFunc which is extended by BinStorage, PigStorage and InterStorage. Semantically, I agree OrderedLoadFunc should not be combinable. However, FileInputFormat's implementation of OrderedLoadFunc makes little sense in that its ordering is based on the (path, offset) pair. This is an ordering but just an arbitrary ordering. Mathematically one can establish any arbitrary ordering over a discrete set of data. But the point is how is the ordering used. For our purpose, the ordering should be related to some keys used in data manipulation for which (path, offset) does not serve the purpose. Or implicitly a FileInputLoadFunc still requires the storage gives out splits in some key ordering. If that storage ordering does not actually exist, FileInputLoadFunc as an OrderedLoadFunc will have no use of its "sortness" because the ordering is just, well, arbitray. The three extensions of FileInputLoadFunc work on generic data storage. Unless they work on sorted data in general, they should not be an OrderedLoadFunc. The other use of OrderedLoadFunc, not its non-overriden method, getSplitComparable, is by map-side cogroup. But it does not check if the sort key is the join key which is critical for correctness. It also requires to be a CollectableLoadFunc to work properly. Since we do not want to break backward compatibility, and the only use of OrderLoadFunc in Pig, except for MergeJinIndexer which is already excluded from combining, is in map side cogroup with CollectableLoadFunc, I mark "CollectableLoadFunc AND an OrderedLoadFunc" as non-combinable. In the future, we should really clean up the the OrderedLoadFunc from FileInputLoadFunc and let the getSplitComparable method provide key-related info and not the (path, offset) pair. Backward compatibility may need to be addressed too. Only then will the water become clearer and I be ok to adjust the noncombinable setting accordingly.
          Hide
          Olga Natkovich added a comment -

          After discussion with Ashutosh and Yan tha agreement is that in addition to checking interfaces we also need to check if we are taking advantage of the loader properties before deciding whether to combine or not.

          For instance, even if the loader implements OrderLoadFunc but there is no merge join in the script, we can still combine.

          Yan, please, compile the list of valid combinations and update the patch, thanks.

          Show
          Olga Natkovich added a comment - After discussion with Ashutosh and Yan tha agreement is that in addition to checking interfaces we also need to check if we are taking advantage of the loader properties before deciding whether to combine or not. For instance, even if the loader implements OrderLoadFunc but there is no merge join in the script, we can still combine. Yan, please, compile the list of valid combinations and update the patch, thanks.
          Hide
          Yan Zhou added a comment -

          In summary, the following functionalities won't see splits combined on loads:

          1) map-side cogroup;
          2) merge join;

          Show
          Yan Zhou added a comment - In summary, the following functionalities won't see splits combined on loads: 1) map-side cogroup; 2) merge join;
          Hide
          Yan Zhou added a comment -

          All other functionalities except for the two mentioned in the previous comment will see splits combined by default, if necessary.

          Show
          Yan Zhou added a comment - All other functionalities except for the two mentioned in the previous comment will see splits combined by default, if necessary.
          Hide
          Justin Sanders added a comment -

          Backported PIG-1518 to the 0.7.0 branch and wanted to share incase anyone else was trying to do the same thing.

          Show
          Justin Sanders added a comment - Backported PIG-1518 to the 0.7.0 branch and wanted to share incase anyone else was trying to do the same thing.
          Justin Sanders made changes -
          Attachment PIG-1518-0.7.0.patch [ 12453951 ]
          Hide
          Olga Natkovich added a comment -

          Hi Justin, thanks for the patch!

          I don't think we can commit it to 0.7 patch because we have already done the official 0.7 release and we can't introduce non-backward compatible changes to this branch.

          However, I think it is great to have the patch on the JIRA so that anybody who is interested in this patch can apply it to their own tree and run with it. We have done similar things in the past (with hadoop versions) and it worked fine.

          Show
          Olga Natkovich added a comment - Hi Justin, thanks for the patch! I don't think we can commit it to 0.7 patch because we have already done the official 0.7 release and we can't introduce non-backward compatible changes to this branch. However, I think it is great to have the patch on the JIRA so that anybody who is interested in this patch can apply it to their own tree and run with it. We have done similar things in the past (with hadoop versions) and it worked fine.
          Yan Zhou made changes -
          Release Note Feature: combine splits of sizes smaller than the value of property "pig.maxCombinedSplitSize" or, if the property of "pig.maxCombinedSplitSize" is not set, the file system default block size of the load's location. This feature can be turned off through setting the property "pig.noSplitCombination" to true. When such a combination is performed, a log message like "Total input paths (combined) to process : 7" will be logged.

          This feature will be applicable if a user input, or an intermediate input, has many small files to be loaded that would otherwise cause many more "under-fed" mappers to be launched and potentially slowdown of the execution.

          This change will not cause any backward compatibility issue except if a loader implementation makes use of the PigSplit object passed through the prepareToRead method where a rebuild of the loader might be necessary as PigSplit's definition has been modified. However, currently we know of no external use of the object.

          In addition, if a loader implements IndexableLoadFunc, or implements OrderedLoadFunc and CollectableLoadFunc, its input splits won't be subject to possible combinations.
          Feature: combine splits of sizes smaller than the value of property "pig.maxCombinedSplitSize" or, if the property of "pig.maxCombinedSplitSize" is not set, the file system default block size of the load's location. This feature can be turned off through setting the property "pig.splitCombination" to "false". When such a combination is performed, a log message like "Total input paths (combined) to process : 7" will be logged.

          This feature will be applicable if a user input, or an intermediate input, has many small files to be loaded that would otherwise cause many more "under-fed" mappers to be launched and potentially slowdown of the execution.

          This change will not cause any backward compatibility issue except if a loader implementation makes use of the PigSplit object passed through the prepareToRead method where a rebuild of the loader might be necessary as PigSplit's definition has been modified. However, currently we know of no external use of the object.

          This change also requires the loader to be stateless across the invocations to the prepareToRead method. That is, the method should reset any internal states that are not affected by the RecordReader argument.
          Otherwise, this feature should be disabled.

          In addition, if a loader implements IndexableLoadFunc, or implements OrderedLoadFunc and CollectableLoadFunc, its input splits won't be subject to possible combinations.
          Olga Natkovich made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Aniket Mokashi made changes -
          Link This issue relates to PIG-2462 [ PIG-2462 ]

            People

            • Assignee:
              Yan Zhou
              Reporter:
              Olga Natkovich
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development