Pig
  1. Pig
  2. PIG-2573

Automagically setting parallelism based on input file size does not work with HCatalog

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.11
    • Component/s: None
    • Labels:
      None

      Description

      PIG-2334 was helpful in understanding this issue. Short version is input file size is only computed if the path begins with a whitelisted prefix, currently:

      • /
      • hdfs:
      • file:
      • s3n:

      As HCatalog locations use the form dbname.tablename the input file size is not computed, and the size-based parallelism optimization breaks.

      DETAILS:

      I discovered this issue comparing two runs on the same script, one loading regular HDFS paths, and one with HCatalog db.table names. I just happened to notice the "Setting number of reducers" line difference.

      Loading HDFS files reducers is set to 99
      2012-03-08 01:33:56,522 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=98406674162
      2012-03-08 01:33:56,522 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 99
      
      Loading with an HCatalog db.table name
      2012-03-08 01:06:02,283 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=0
      2012-03-08 01:06:02,283 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
      

      Possible fix: Pig should just ask the loader for the size of its inputs rather than special-casing certain location types.

      1. PIG-2573_move_getinputbytes_to_loadfunc.diff
        12 kB
        Travis Crawford
      2. PIG-2573_get_size_from_stats_if_possible.diff
        19 kB
        Travis Crawford
      3. PIG-2573_get_size_from_stats_if_possible_4.diff
        22 kB
        Travis Crawford
      4. PIG-2573_get_size_from_stats_if_possible_3.diff
        20 kB
        Travis Crawford
      5. PIG-2573_get_size_from_stats_if_possible_2.diff
        20 kB
        Travis Crawford

        Activity

        Hide
        Daniel Dai added a comment -

        Yes, we should put the logic into LoadFunc. There will also be a patch for HCatLoader in HCat side. Anyone is working on a patch?

        Show
        Daniel Dai added a comment - Yes, we should put the logic into LoadFunc. There will also be a patch for HCatLoader in HCat side. Anyone is working on a patch?
        Hide
        Travis Crawford added a comment -

        Yup, I'm looking into this. After pig supports it I can also do the HCatLoader work, since it's what I'm primarily interested in.

        Show
        Travis Crawford added a comment - Yup, I'm looking into this. After pig supports it I can also do the HCatLoader work, since it's what I'm primarily interested in.
        Hide
        Travis Crawford added a comment -

        The attached patch shows what moving the "get size of input" logic into LoadFunc looks like.

        I have some concerns about how invasive this approach is. Specifically it requires loaders to implement a new getLocation method for the optimization to work. While I can update the loaders shipped by pig, this breaks the optimization for all other loaders out there. Option: fall back to the current behavior if the loader returns null for getLocation (meaning its likely using the default implementation).

        If we like this general approach I can polish & update all the included loaders. If we don't like this approach - any suggestions?

        Show
        Travis Crawford added a comment - The attached patch shows what moving the "get size of input" logic into LoadFunc looks like. I have some concerns about how invasive this approach is. Specifically it requires loaders to implement a new getLocation method for the optimization to work. While I can update the loaders shipped by pig, this breaks the optimization for all other loaders out there. Option: fall back to the current behavior if the loader returns null for getLocation (meaning its likely using the default implementation). If we like this general approach I can polish & update all the included loaders. If we don't like this approach - any suggestions?
        Hide
        Bill Graham added a comment -

        Another approach that would be backward compatible would be to introduce an new interface that a LoadFunc can implement to advertise that it supports load statistics, like input size. Something similar to how StoreFuncs include a number of optional interfaces.

        Also, should we think more broadly being able to return other info besides just total input bytes? Like we could instead return a new LoadStats object (or the InputStats class) that could encapsulate more than just the input size. Things like input size, records read, etc? It would be a move towards pushing stats collection into the loaders and store funcs.

        Show
        Bill Graham added a comment - Another approach that would be backward compatible would be to introduce an new interface that a LoadFunc can implement to advertise that it supports load statistics, like input size. Something similar to how StoreFuncs include a number of optional interfaces. Also, should we think more broadly being able to return other info besides just total input bytes? Like we could instead return a new LoadStats object (or the InputStats class) that could encapsulate more than just the input size. Things like input size, records read, etc? It would be a move towards pushing stats collection into the loaders and store funcs.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Bill, I know, let's call it LoadMetadata

        Discussed this with Travis, he'll just do instanceof LoadMetadata and go from there.

        Show
        Dmitriy V. Ryaboy added a comment - Bill, I know, let's call it LoadMetadata Discussed this with Travis, he'll just do instanceof LoadMetadata and go from there.
        Hide
        Travis Crawford added a comment -

        Patch has been updated to get the size from loader-reported statistics, if possible. If not possible, the current behavior remains.

        Show
        Travis Crawford added a comment - Patch has been updated to get the size from loader-reported statistics, if possible. If not possible, the current behavior remains.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Nice work, Travis.

        Tests pass.

        I think we might as well take PigStorageWithStatistics and move it into PigStorage. Might make sense to save a transient LRU map of location->stats in case getStats gets called a few times, so we don't keep hitting the NN for file sizes over and over.

        It looks like the current behavior is that if you have multiple POLoads, and only a subset of them implement LoadMetadata and return non-0 size, FS is used. Meaning, if I have 2 loaders, and one of them reports size and the other does not, the first loader's reported size is ignored. That's ok (better than it is now!) but not ideal. Perhaps we can move the logic of checking metadata or the FS into the inner loop of getInputSizeFromLoadMetadata?

        Also, just stylistically, lets rename that method to getInputSizeFromMetadata (more readable, and refers to the concept, not an interface).

        Show
        Dmitriy V. Ryaboy added a comment - Nice work, Travis. Tests pass. I think we might as well take PigStorageWithStatistics and move it into PigStorage. Might make sense to save a transient LRU map of location->stats in case getStats gets called a few times, so we don't keep hitting the NN for file sizes over and over. It looks like the current behavior is that if you have multiple POLoads, and only a subset of them implement LoadMetadata and return non-0 size, FS is used. Meaning, if I have 2 loaders, and one of them reports size and the other does not, the first loader's reported size is ignored. That's ok (better than it is now!) but not ideal. Perhaps we can move the logic of checking metadata or the FS into the inner loop of getInputSizeFromLoadMetadata? Also, just stylistically, lets rename that method to getInputSizeFromMetadata (more readable, and refers to the concept, not an interface).
        Hide
        Travis Crawford added a comment -

        PIGSTORAGEWITHSTATISTICS COMMENT:

        Originally I did something similar to what you suggested, but after a bit more thought kept PigStorage unchanged, and used a test-specific loader. Since we fall back to the existing "get size from supported filesystems" lookup, PigStorage already has this feature for most users. JobControlCompiler and PigStorage would call the same utility method to report size, so I think the code is actually more complex by updating PigStorage.

        The goal here is letting a loader report the size of its input for non-filesystems (hcatalog db.table names, rows from hbase/vertica/mysql/...) , or when doing something fancy with files on a filesystem (indexed files where blocks/splits are pre-filtered). If you're doing something fancy you probably have a fancy loader too.

        PARTIAL SIZE REPORTING COMMENT:

        Having size be all-or-none was intentional. It seemed very confusion for pig to base a decision on one number (and log that input size) then have the MR job read a different amount of data. I think its best to keep the current behavior and only make this optimization if its based on the actual input size.

        METHOD NAME COMMENT:

        How does getInputSizeFromLoader sound?

        Show
        Travis Crawford added a comment - PIGSTORAGEWITHSTATISTICS COMMENT: Originally I did something similar to what you suggested, but after a bit more thought kept PigStorage unchanged, and used a test-specific loader. Since we fall back to the existing "get size from supported filesystems" lookup, PigStorage already has this feature for most users. JobControlCompiler and PigStorage would call the same utility method to report size, so I think the code is actually more complex by updating PigStorage. The goal here is letting a loader report the size of its input for non-filesystems (hcatalog db.table names, rows from hbase/vertica/mysql/...) , or when doing something fancy with files on a filesystem (indexed files where blocks/splits are pre-filtered). If you're doing something fancy you probably have a fancy loader too. PARTIAL SIZE REPORTING COMMENT: Having size be all-or-none was intentional. It seemed very confusion for pig to base a decision on one number (and log that input size) then have the MR job read a different amount of data. I think its best to keep the current behavior and only make this optimization if its based on the actual input size. METHOD NAME COMMENT: How does getInputSizeFromLoader sound?
        Hide
        Travis Crawford added a comment -

        Updated patch renamed method, and total size is reported as the size of inputs size is available for. Rereading the existing code this is how it currently behaves.

        Show
        Travis Crawford added a comment - Updated patch renamed method, and total size is reported as the size of inputs size is available for. Rereading the existing code this is how it currently behaves.
        Hide
        Julien Le Dem added a comment -

        Hey Travis,
        Looks good to me.

        Some comments:

        • in test/org/apache/pig/test/PigStorageWithStatistics.java
              private Long getInputmBytes() throws IOException {
          ...
                  return inputBytes / 1024;
              }
          

          I guess it should be / 1024 again to return mega bytes

        • Let's add getByteSize() and setByteSize(size) to org.apache.pig.ResourceStatistics (with backward compatible implementations of getmBytes)
        • in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java, the following line seems unnecessary as long as the file system implementation returns a size. But we can open this as a separate issue.
           if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
          
        Show
        Julien Le Dem added a comment - Hey Travis, Looks good to me. Some comments: in test/org/apache/pig/test/PigStorageWithStatistics.java private Long getInputmBytes() throws IOException { ... return inputBytes / 1024; } I guess it should be / 1024 again to return mega bytes Let's add getByteSize() and setByteSize(size) to org.apache.pig.ResourceStatistics (with backward compatible implementations of getmBytes) in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java, the following line seems unnecessary as long as the file system implementation returns a size. But we can open this as a separate issue. if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
        Hide
        Travis Crawford added a comment -

        Good comments Julien, thanks.

        Please see the comments for why we keep mbytes in ResourceStatistics - basically its public and changing could break things.

        Show
        Travis Crawford added a comment - Good comments Julien, thanks. Please see the comments for why we keep mbytes in ResourceStatistics - basically its public and changing could break things.
        Hide
        Travis Crawford added a comment -

        Whoops, the previous diff missed a file. I ran test-commit with this one.

        Show
        Travis Crawford added a comment - Whoops, the previous diff missed a file. I ran test-commit with this one.
        Hide
        Julien Le Dem added a comment -

        ant test-patch

        [exec] -1 overall.
        [exec]
        [exec] +1 @author. The patch does not contain any @author tags.
        [exec]
        [exec] +1 tests included. The patch appears to include 5 new or modified tests.
        [exec]
        [exec] -1 javadoc. The javadoc tool appears to have generated 1 warning messages.
        [exec]
        [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
        [exec]
        [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
        [exec]
        [exec] -1 release audit. The applied patch generated 541 release audit warnings (more than the trunk's current 534 warnings).

        the new warnings are unrelated to this patch.

        +1 on my side

        Show
        Julien Le Dem added a comment - ant test-patch [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 5 new or modified tests. [exec] [exec] -1 javadoc. The javadoc tool appears to have generated 1 warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] -1 release audit. The applied patch generated 541 release audit warnings (more than the trunk's current 534 warnings). the new warnings are unrelated to this patch. +1 on my side

          People

          • Assignee:
            Travis Crawford
            Reporter:
            Travis Crawford
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development