Pig
  1. Pig
  2. PIG-3047

Check the size of a relation before adding it to distributed cache in Replicated join

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.12.1
    • Component/s: None
    • Labels:
      None

      Description

      Right now if someone makes a mistake and put the large relation last, Pig will copy a huge file into distributed cache and it will take a long time before the job eventually fails. It would be better to check before copying the relation that it is of reasonable size.
      <1 GB ?

      1. PIG-3047-3.patch
        14 kB
        Aniket Mokashi
      2. PIG-3047-2.patch
        12 kB
        Aniket Mokashi
      3. fix_pig_3047_1.diff
        6 kB
        Ido Hadanny
      4. fix_pig_3047.diff
        6 kB
        Ido Hadanny

        Activity

        Hide
        Aniket Mokashi added a comment -

        Thanks Koji Noguchi for catching this. I have added the missing imports to TestFRJoin2.java.

        Show
        Aniket Mokashi added a comment - Thanks Koji Noguchi for catching this. I have added the missing imports to TestFRJoin2.java.
        Hide
        Aniket Mokashi added a comment -

        It's missing an import statement which was not required by trunk. I will commit the change shortly after test-commit pass.

        Show
        Aniket Mokashi added a comment - It's missing an import statement which was not required by trunk. I will commit the change shortly after test-commit pass.
        Hide
        Koji Noguchi added a comment -

        Seems like compile-test is failing on test/TestFRJoin2.java after the commit. Can you take a look?

        Sorry, this is on 0.12 branch.

        Show
        Koji Noguchi added a comment - Seems like compile-test is failing on test/TestFRJoin2.java after the commit. Can you take a look? Sorry, this is on 0.12 branch.
        Hide
        Koji Noguchi added a comment -

        Seems like compile-test is failing on test/TestFRJoin2.java after the commit. Can you take a look?

        Show
        Koji Noguchi added a comment - Seems like compile-test is failing on test/TestFRJoin2.java after the commit. Can you take a look?
        Hide
        Aniket Mokashi added a comment -

        Committed to trunk and 0.12 branch!
        Thanks Ido Hadanny and Cheolsoo Park..

        Show
        Aniket Mokashi added a comment - Committed to trunk and 0.12 branch! Thanks Ido Hadanny and Cheolsoo Park ..
        Hide
        Aniket Mokashi added a comment -
        Show
        Aniket Mokashi added a comment - RB: https://reviews.apache.org/r/14964/
        Hide
        Aniket Mokashi added a comment -

        Your patch does not have the right fix for this problem. If you look closely, the first fragment of replFiles is not added to distributed cache and hence we should not add all of replFiles. The reason first patch works is because at the time this check is made replFiles[i] is null hence it is skipped. Also, since you are not recursively listing the files, you do not hit the problem we hit in the second patch (hidden _logs directory). The reason second patch test case shows big numbers is because of hidden _logs directory in the replicated data path. I fixed the Utils.getPathLength method to ignore the hidden files. This also affects reducer estimation code path by ignoring hidden directories while listing input, which I feel is right way to go.

        Show
        Aniket Mokashi added a comment - Your patch does not have the right fix for this problem. If you look closely, the first fragment of replFiles is not added to distributed cache and hence we should not add all of replFiles. The reason first patch works is because at the time this check is made replFiles [i] is null hence it is skipped. Also, since you are not recursively listing the files, you do not hit the problem we hit in the second patch (hidden _logs directory). The reason second patch test case shows big numbers is because of hidden _logs directory in the replicated data path. I fixed the Utils.getPathLength method to ignore the hidden files. This also affects reducer estimation code path by ignoring hidden directories while listing input , which I feel is right way to go.
        Hide
        Aniket Mokashi added a comment -

        Ido Hadanny, thanks for updating the patch!

        Few more comments-

        1. comment on the test still mentions the old property name.
        2. We typically catch the exception in test and assert the error message. Can you modify your test to do so?

        I tried your patch on trunk and found some problems with it.

        1. If I modify the test to remove the catch exception annotation and increase the configuration value to 18. I get following exception stack-
          Caused by: org.apache.pig.impl.plan.VisitorException: ERROR 0: Replicated input files size: 64063 exceeds pig.join.replicated.max.bytes: 18
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler$JoinDistributedCacheVisitor.visitFRJoin(JobControlCompiler.java:1578)
          	... 42 more
          
        2. With your old patch, I get following exception stack-
          Caused by: org.apache.pig.impl.plan.VisitorException: ERROR 0: Replicated input files size: 17 exceeds pig.fr_join.max.bytes.size: 14
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler$JoinDistributedCacheVisitor.visitFRJoin(JobControlCompiler.java:1579)
          	... 42 more
          
        Show
        Aniket Mokashi added a comment - Ido Hadanny , thanks for updating the patch! Few more comments- comment on the test still mentions the old property name. We typically catch the exception in test and assert the error message. Can you modify your test to do so? I tried your patch on trunk and found some problems with it. If I modify the test to remove the catch exception annotation and increase the configuration value to 18. I get following exception stack- Caused by: org.apache.pig.impl.plan.VisitorException: ERROR 0: Replicated input files size: 64063 exceeds pig.join.replicated.max.bytes: 18 at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler$JoinDistributedCacheVisitor.visitFRJoin(JobControlCompiler.java:1578) ... 42 more With your old patch, I get following exception stack- Caused by: org.apache.pig.impl.plan.VisitorException: ERROR 0: Replicated input files size: 17 exceeds pig.fr_join.max.bytes.size: 14 at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler$JoinDistributedCacheVisitor.visitFRJoin(JobControlCompiler.java:1579) ... 42 more
        Hide
        Ido Hadanny added a comment -

        Thanks Aniket Mokashi! Ran Yuchtman and I implemented your suggestions and uploaded the fixed patch. Please re-review.

        Show
        Ido Hadanny added a comment - Thanks Aniket Mokashi ! Ran Yuchtman and I implemented your suggestions and uploaded the fixed patch. Please re-review.
        Hide
        Aniket Mokashi added a comment -

        Also, how about rename pig.fr_join.max.bytes.size to pig.join.replicated.max.bytes (or any other suitable)? Also, please move it to PigConfiguration class and also make an entry into pig.properties.

        Show
        Aniket Mokashi added a comment - Also, how about rename pig.fr_join.max.bytes.size to pig.join.replicated.max.bytes (or any other suitable)? Also, please move it to PigConfiguration class and also make an entry into pig.properties.
        Hide
        Aniket Mokashi added a comment -

        This is excellent. option 3 should be good enough to cover scope of this jira.
        A few comments on implementation-

        • if(!UriUtil.isHDFSFile(location)) - is that required?
        • Can you use Utils.getPathLength to get the size of files.
        Show
        Aniket Mokashi added a comment - This is excellent. option 3 should be good enough to cover scope of this jira. A few comments on implementation- if(!UriUtil.isHDFSFile(location)) - is that required? Can you use Utils.getPathLength to get the size of files.
        Hide
        Ran Yuchtman added a comment -

        we only implemented option 3 in Prashant suggestion (with deafult of 1G), but it should be pretty easy to implement the rest

        Show
        Ran Yuchtman added a comment - we only implemented option 3 in Prashant suggestion (with deafult of 1G), but it should be pretty easy to implement the rest
        Hide
        Ido Hadanny added a comment -

        When the replicated input sizes is bigger than pig.fr_join.max.bytes.size, we throw a nice exception (ryuchtman + ihadanny)

        Show
        Ido Hadanny added a comment - When the replicated input sizes is bigger than pig.fr_join.max.bytes.size, we throw a nice exception (ryuchtman + ihadanny)
        Hide
        Ido Hadanny added a comment -

        no, was planning to start today. is it ok with you?

        Show
        Ido Hadanny added a comment - no, was planning to start today. is it ok with you?
        Hide
        Aniket Mokashi added a comment -

        Ido Hadanny, have you already started working on this?

        Show
        Aniket Mokashi added a comment - Ido Hadanny , have you already started working on this?
        Hide
        Julien Le Dem added a comment -

        sounds good to me

        Show
        Julien Le Dem added a comment - sounds good to me
        Hide
        Jonathan Coveney added a comment -

        Prashant: that sounds good to me. Just make it configurable (as you proposed) and have those configuration keys in the PigConfiguration class and it should be good.

        Show
        Jonathan Coveney added a comment - Prashant: that sounds good to me. Just make it configurable (as you proposed) and have those configuration keys in the PigConfiguration class and it should be good.
        Hide
        Prashant Kommireddi added a comment -

        Julien, Jon - what do you guys think about the above proposal?

        Show
        Prashant Kommireddi added a comment - Julien, Jon - what do you guys think about the above proposal?
        Hide
        Prashant Kommireddi added a comment -

        We could possibly do something similar to Hadoop's ShuffleRamManager, minus the wait()/notify() on memory freeing-up

        1. % of max heap/available memory to JVM
        2. % could be specified by user
        3. User could also choose to specify a certain size in bytes
        4. Ship pig with a conservative % to start with

        Thoughts?

        Show
        Prashant Kommireddi added a comment - We could possibly do something similar to Hadoop's ShuffleRamManager, minus the wait()/notify() on memory freeing-up 1. % of max heap/available memory to JVM 2. % could be specified by user 3. User could also choose to specify a certain size in bytes 4. Ship pig with a conservative % to start with Thoughts?
        Hide
        Julien Le Dem added a comment -

        I'm open to suggestions regarding what is a reasonable size for a relation used in a replicated join.
        should it be a % of the max heap size?
        I'd avoid to parse mapred.child.java.opts though. Is there a better way?

        Show
        Julien Le Dem added a comment - I'm open to suggestions regarding what is a reasonable size for a relation used in a replicated join. should it be a % of the max heap size? I'd avoid to parse mapred.child.java.opts though. Is there a better way?
        Hide
        Jonathan Coveney added a comment -

        I agree with Prashant that hardcoding a size is a bit janky.

        Show
        Jonathan Coveney added a comment - I agree with Prashant that hardcoding a size is a bit janky.
        Hide
        Prashant Kommireddi added a comment -

        Hi Julien,

        What do you think about setting this size as a % based on "mapred.child.java.opts"?

        Show
        Prashant Kommireddi added a comment - Hi Julien, What do you think about setting this size as a % based on "mapred.child.java.opts"?

          People

          • Assignee:
            Aniket Mokashi
            Reporter:
            Julien Le Dem
          • Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development