Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.2.0
    • Fix Version/s: 0.2.0
    • Component/s: None
    • Labels:
      None
    • Patch Info:
      Patch Available

      Description

      Fragment Replicate Join(FRJ) is useful when we want a join between a huge table and a very small table (fitting in memory small) and the join doesn't expand the data by much. The idea is to distribute the processing of the huge files by fragmenting it and replicating the small file to all machines receiving a fragment of the huge file. Because of the availability of the entire small file, the join becomes a trivial task without needing any break in the pipeline. Exhaustive test have done to determine the improvement we get out of FRJ. Here are the details: http://wiki.apache.org/pig/PigFRJoin

      The patch makes changes to parts of the code where new operators are introduced. Currently, when a new operator is introduced, its alias is not set. For schema computation I have modified this behaviour to set the alias of the new operator to that of its predecessor. The logical side of the patch mimics the cogroup behavior as join syntax closely resembles that of cogroup. Currently, this patch doesn't have support for joins other than inner joins. The rest of the code has been documented.

      1. PIG-554-v4.patch
        79 kB
        Pradeep Kamath
      2. PIG-554-v3.patch
        58 kB
        Pradeep Kamath
      3. frjofflat1.patch
        75 kB
        Shravan Matthur Narayanamurthy
      4. frjofflat.patch
        60 kB
        Shravan Matthur Narayanamurthy

        Activity

        Hide
        Shravan Matthur Narayanamurthy added a comment -

        Cool! Thanks everyone for reviewing and updating the patch

        Show
        Shravan Matthur Narayanamurthy added a comment - Cool! Thanks everyone for reviewing and updating the patch
        Hide
        Alan Gates added a comment -

        Patch v4 checked in. Thanks Shravan for all your work on this. Initial tests show speed ups in the 2-4x range. This is huge.

        Show
        Alan Gates added a comment - Patch v4 checked in. Thanks Shravan for all your work on this. Initial tests show speed ups in the 2-4x range. This is huge.
        Hide
        Pradeep Kamath added a comment -

        Changes in new patch (attached):
        1) The HashMap now has (tuple, List<Tuple>) to address the concern that Bag would be worse spacewise than a List<Tuple>. BagFactory now has a method newDefaultBag(List<Tuple>) which will create a DefaultDataBag out of the List<Tuple> by taking ownership of the list and without copying the elements. This way in POFRJoin.getNext() we can create a bag out of the List<Tuple> without much overhead.
        2) Added back the unit test - TestFRJoin - the change in this file is to use Util.createInputFile() to create input file for the tests on the minicluster DFS rather than local file system. It used Util.deleteFile() to delete the file before after each test run.

        Show
        Pradeep Kamath added a comment - Changes in new patch (attached): 1) The HashMap now has (tuple, List<Tuple>) to address the concern that Bag would be worse spacewise than a List<Tuple>. BagFactory now has a method newDefaultBag(List<Tuple>) which will create a DefaultDataBag out of the List<Tuple> by taking ownership of the list and without copying the elements. This way in POFRJoin.getNext() we can create a bag out of the List<Tuple> without much overhead. 2) Added back the unit test - TestFRJoin - the change in this file is to use Util.createInputFile() to create input file for the tests on the minicluster DFS rather than local file system. It used Util.deleteFile() to delete the file before after each test run.
        Hide
        Shravan Matthur Narayanamurthy added a comment -

        (1) is a good catch! Really hadn't thought about this.

        (2) Hashtable to HashMap is fine but should we be storing DataBag instead of List? I thought DataBag took more space than List because of which the number of tuples we can handle decreases.

        I think you forgot to include TestFRJoin in the patch. Rest looks good to me.

        Show
        Shravan Matthur Narayanamurthy added a comment - (1) is a good catch! Really hadn't thought about this. (2) Hashtable to HashMap is fine but should we be storing DataBag instead of List? I thought DataBag took more space than List because of which the number of tuples we can handle decreases. I think you forgot to include TestFRJoin in the patch. Rest looks good to me.
        Hide
        Pradeep Kamath added a comment -

        Changes in new patch submitted (PIG-554-v3.patch):
        1) The code was not handling the case where the join key was "*" as illustrated in the script below:

        a = load ... as (a:chararray, b:chararray);
        b = load ... as (a:chararray, b);
        c = join a by *, b by * using "replicated";
        dump c;
        

        In the above script the join column is a tuple whose second column in second input needs to be casted so that key types for both inputs match. For this, the ProjectStarTranslator should have an implementation for visit(LOFRJoin) so that the Project is translated to multiple Project operations. After this translation, the type checking code will correctly decipher the join key to be a tuple and insert the necessary cast.
        2) In POFRJoin, HashMap is used instead of HashTable to avoid any performance loss due to synchronization code in HashTable (HashMap is not synchronized). Also this HashMap has (tuple, DataBag) as Entries instead of the earlier (tuple, List<Tuple>) to avoid constructing bags out of the List in getNext()
        3) Changed a couple of System.out.println() statements to log.debug()

        Show
        Pradeep Kamath added a comment - Changes in new patch submitted ( PIG-554 -v3.patch): 1) The code was not handling the case where the join key was "*" as illustrated in the script below: a = load ... as (a:chararray, b:chararray); b = load ... as (a:chararray, b); c = join a by *, b by * using "replicated" ; dump c; In the above script the join column is a tuple whose second column in second input needs to be casted so that key types for both inputs match. For this, the ProjectStarTranslator should have an implementation for visit(LOFRJoin) so that the Project is translated to multiple Project operations. After this translation, the type checking code will correctly decipher the join key to be a tuple and insert the necessary cast. 2) In POFRJoin, HashMap is used instead of HashTable to avoid any performance loss due to synchronization code in HashTable (HashMap is not synchronized). Also this HashMap has (tuple, DataBag) as Entries instead of the earlier (tuple, List<Tuple>) to avoid constructing bags out of the List in getNext() 3) Changed a couple of System.out.println() statements to log.debug()
        Hide
        Shravan Matthur Narayanamurthy added a comment -

        1) Consider the following script:
        A = load 'file1';
        B = load 'file2';
        C = filter A by $0>10;
        D = filter B by $0<10;
        E = join C by $0, D by $0 using replicated;

        We need to materialize the result of D before we can use it as replicated input. Also DC has not been used as it doesn't support directories iirc (we will have to handle many complications manually) and the load specification in pig can contain regexps too. Also as the size of the replicated file is small it doesn't make too much diff.

        2) Instead of writing all the code to handle the various combinations of the group item specification, I chose to use LR which already does it. I think I store only the plain tuple(extracted from the LR ouput) and not the LR output in the hashtables. So it doesn't add to any memory overhead. The LR is used only to separate out key & value and these are stored as a mapping from key to value (plain tuples).

        Show
        Shravan Matthur Narayanamurthy added a comment - 1) Consider the following script: A = load 'file1'; B = load 'file2'; C = filter A by $0>10; D = filter B by $0<10; E = join C by $0, D by $0 using replicated; We need to materialize the result of D before we can use it as replicated input. Also DC has not been used as it doesn't support directories iirc (we will have to handle many complications manually) and the load specification in pig can contain regexps too. Also as the size of the replicated file is small it doesn't make too much diff. 2) Instead of writing all the code to handle the various combinations of the group item specification, I chose to use LR which already does it. I think I store only the plain tuple(extracted from the LR ouput) and not the LR output in the hashtables. So it doesn't add to any memory overhead. The LR is used only to separate out key & value and these are stored as a mapping from key to value (plain tuples).
        Hide
        Alan Gates added a comment -

        A couple of questions:

        1) I'm still not clear on why the additional maps are needed to load the replicated inputs into files. Those inputs are already in files. Are you somehow transforming them? Isn't this exactly where we should be using the DistributedCache? Rather than having map jobs that transform them I think the best thing would be to have the MRCompiler set a flag for the JobControlCompiler to load those files into the DC for this job.

        2) You are using POLocalRearrange both in setting up the hash table and in reading the fragmented table before the join. What benefit is being derived from this? LR adds a lot of extra weight to the tuple that I don't think is needed. I suspect we could fit more tuples into memory if we loaded them directly rather than using LR.

        Show
        Alan Gates added a comment - A couple of questions: 1) I'm still not clear on why the additional maps are needed to load the replicated inputs into files. Those inputs are already in files. Are you somehow transforming them? Isn't this exactly where we should be using the DistributedCache? Rather than having map jobs that transform them I think the best thing would be to have the MRCompiler set a flag for the JobControlCompiler to load those files into the DC for this job. 2) You are using POLocalRearrange both in setting up the hash table and in reading the fragmented table before the join. What benefit is being derived from this? LR adds a lot of extra weight to the tuple that I don't think is needed. I suspect we could fit more tuples into memory if we loaded them directly rather than using LR.
        Hide
        Shravan Matthur Narayanamurthy added a comment -

        The latest one has the fixes mentioned above. Please take a look.

        Show
        Shravan Matthur Narayanamurthy added a comment - The latest one has the fixes mentioned above. Please take a look.
        Hide
        Shravan Matthur Narayanamurthy added a comment -

        (1) Have fixed in my local branch
        (2) You are right. I missed that one but its a minor fix. Have fixed it in my branch
        (3) I was copying some code from LOCogroup and copied the comment inadvertently. There is no such restriction
        (4) Fixed in local branch
        (5) We do support any number of replicated tables. Have added a whole bunch of test cases to test joins of 3 tables, joins with and without schema & also to test schema computation of the frjoin. Please take a look
        (6) Yes. As I had mentioned in one of the meetings, if the FRJoin has n inputs(1 fragmented & n-1 replicated) then there will be n-1 map jobs that will materialize the n-1 replicated inputs to files so that they can then be read to construct the hash map.

        I am not submitting the patch yet because I see GC overhead limit reached exceptions even with 100MB replicated file when the vm is initialized with 1G heap space. I am still trying to figure out what is causing them. I noticed them while I was trying to figure out the limit for the size of the replicated file.

        Show
        Shravan Matthur Narayanamurthy added a comment - (1) Have fixed in my local branch (2) You are right. I missed that one but its a minor fix. Have fixed it in my branch (3) I was copying some code from LOCogroup and copied the comment inadvertently. There is no such restriction (4) Fixed in local branch (5) We do support any number of replicated tables. Have added a whole bunch of test cases to test joins of 3 tables, joins with and without schema & also to test schema computation of the frjoin. Please take a look (6) Yes. As I had mentioned in one of the meetings, if the FRJoin has n inputs(1 fragmented & n-1 replicated) then there will be n-1 map jobs that will materialize the n-1 replicated inputs to files so that they can then be read to construct the hash map. I am not submitting the patch yet because I see GC overhead limit reached exceptions even with 100MB replicated file when the vm is initialized with 1G heap space. I am still trying to figure out what is causing them. I noticed them while I was trying to figure out the limit for the size of the replicated file.
        Hide
        Olga Natkovich added a comment -

        I ran tests and they all passed.

        Here are some comments on the patch:

        (1) New files should include apache header
        (2) LOFRJoin.getSchema(): I don't think nonDuplicates computation would work for more than two tables with the same column
        (3) LOFRJoin.getTupleJoinColSchema(): has a comment saying:"This doesn't work with join by complex type". Does this that FRJ does not work with columns of type Tuple? According to Alan, tuple columns are supported in the case of regular join. I think it is ok if initial patch does not support it but we should probably have a separate JIRA to track this issue.
        (4) In the grammar, you made "replicated" to be token. I thought we would make it a string so not to bloat the keyword space.
        (5) I see that implementation seems to allow more than 2 tables but the test cases only cover 2 tables. I am fine if we initially only support 2 tables - I just wanted to clarify the intent here.
        (6) Also, I ran explain on the following query and the results seems to have a separate map step that I was not sure about:

        A = load '/user/pig/tests/data/singlefile/student_data' as (name, age, gpa);
        B = load '/user/pig/tests/data/singlefile/student_data' as (name, age, gpa);
        C = JOIN A by name, age B by name, age USING replicated;
        explain C;

        --------------------------------------------------

        Map Reduce Plan

        --------------------------------------------------
        MapReduce node olgan-Wed Dec 03 14:21:35 PST 2008-57
        Map Plan
        Store(/tmp/temp921697735/tmp-320517577:org.apache.pig.builtin.BinStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-58

        --Load(/user/pig/tests/data/singlefile/studenttab10k:org.apache.pig.builtin.PigStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-44-------
        Global sort: false
        ----------------
        MapReduce node olgan-Wed Dec 03 14:21:35 PST 2008-56
        Map Plan
        Store(fakefile:org.apache.pig.builtin.PigStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-55
        ---FRJoin[tuple] - olgan-Wed Dec 03 14:21:35 PST 2008-49
         
        Project[bytearray][0] - olgan-Wed Dec 03 14:21:35 PST 2008-45
         
        Project[bytearray][1] - olgan-Wed Dec 03 14:21:35 PST 2008-46
         
        Project[bytearray][0] - olgan-Wed Dec 03 14:21:35 PST 2008-47
         
        Project[bytearray][1] - olgan-Wed Dec 03 14:21:35 PST 2008-48
        --Load(/user/pig/tests/data/singlefile/studenttab10k:org.apache.pig.builtin.PigStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-43-------
        Global sort: false
        Show
        Olga Natkovich added a comment - I ran tests and they all passed. Here are some comments on the patch: (1) New files should include apache header (2) LOFRJoin.getSchema(): I don't think nonDuplicates computation would work for more than two tables with the same column (3) LOFRJoin.getTupleJoinColSchema(): has a comment saying:"This doesn't work with join by complex type". Does this that FRJ does not work with columns of type Tuple? According to Alan, tuple columns are supported in the case of regular join. I think it is ok if initial patch does not support it but we should probably have a separate JIRA to track this issue. (4) In the grammar, you made "replicated" to be token. I thought we would make it a string so not to bloat the keyword space. (5) I see that implementation seems to allow more than 2 tables but the test cases only cover 2 tables. I am fine if we initially only support 2 tables - I just wanted to clarify the intent here. (6) Also, I ran explain on the following query and the results seems to have a separate map step that I was not sure about: A = load '/user/pig/tests/data/singlefile/student_data' as (name, age, gpa); B = load '/user/pig/tests/data/singlefile/student_data' as (name, age, gpa); C = JOIN A by name, age B by name, age USING replicated; explain C; -------------------------------------------------- Map Reduce Plan -------------------------------------------------- MapReduce node olgan-Wed Dec 03 14:21:35 PST 2008-57 Map Plan Store(/tmp/temp921697735/tmp-320517577:org.apache.pig.builtin.BinStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-58 -- Load(/user/pig/tests/data/singlefile/studenttab10k:org.apache.pig.builtin.PigStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-44 ------- Global sort: false ---------------- MapReduce node olgan-Wed Dec 03 14:21:35 PST 2008-56 Map Plan Store(fakefile:org.apache.pig.builtin.PigStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-55 ---FRJoin [tuple] - olgan-Wed Dec 03 14:21:35 PST 2008-49   Project [bytearray] [0] - olgan-Wed Dec 03 14:21:35 PST 2008-45   Project [bytearray] [1] - olgan-Wed Dec 03 14:21:35 PST 2008-46   Project [bytearray] [0] - olgan-Wed Dec 03 14:21:35 PST 2008-47   Project [bytearray] [1] - olgan-Wed Dec 03 14:21:35 PST 2008-48 -- Load(/user/pig/tests/data/singlefile/studenttab10k:org.apache.pig.builtin.PigStorage) - olgan-Wed Dec 03 14:21:35 PST 2008-43 ------- Global sort: false

          People

          • Assignee:
            Shravan Matthur Narayanamurthy
            Reporter:
            Shravan Matthur Narayanamurthy
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development