Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.0
    • Component/s: impl
    • Labels:
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Hide
      Allow cross two or more bags inside a foreach statement. For example:

      user = load 'user' as (uid, age, gender, region);
      session = load 'session' as (uid, region);
      C = cogroup user by uid, session by uid;
      D = foreach C {
          crossed = cross user, session;
          generate crossed;
      }
      Show
      Allow cross two or more bags inside a foreach statement. For example: user = load 'user' as (uid, age, gender, region); session = load 'session' as (uid, region); C = cogroup user by uid, session by uid; D = foreach C {     crossed = cross user, session;     generate crossed; }

      Description

      It is useful to have cross inside foreach nested statement. One typical use case for nested foreach is after cogroup two relations, we want to flatten the records of the same key, and do some processing. This is naturally to be achieved by cross. Eg:

      C = cogroup user by uid, session by uid;
      D = foreach C {
          crossed = cross user, session; -- To flatten two input bags
          filtered = filter crossed by user::region == session::region;
          result = foreach crossed generate processSession(user::age, user::gender, session::ip);  --Nested foreach Jira: PIG-1631
          generate result;
      }
      

      If we don't have cross, user have to write a UDF process the bag user, session. It is much harder than a UDF process flattened tuples. This is especially true when we have nested foreach statement(PIG-1631).

      This is a candidate project for Google summer of code 2011. More information about the program can be found at http://wiki.apache.org/pig/GSoc2011

      1. PIG-1916_1.patch
        10 kB
        Zhijie Shen
      2. PIG-1916_2.patch
        31 kB
        Zhijie Shen
      3. PIG-1916_3.patch
        31 kB
        Zhijie Shen
      4. PIG-1916_4.patch
        38 kB
        Zhijie Shen
      5. PIG-1916_5.patch
        39 kB
        Daniel Dai

        Issue Links

          Activity

          Hide
          Zhijie Shen added a comment -

          Thanks for your help, Daniel!

          Show
          Zhijie Shen added a comment - Thanks for your help, Daniel!
          Hide
          Daniel Dai added a comment -

          TestScriptLanguage fail, but it is not caused by this patch. Will open a separate Jira to fix test case. Test-patch show several more javac warnings, all of them are in Antlr generated code.

          Patch committed to trunk. Congratulation, Zhijie!

          Show
          Daniel Dai added a comment - TestScriptLanguage fail, but it is not caused by this patch. Will open a separate Jira to fix test case. Test-patch show several more javac warnings, all of them are in Antlr generated code. Patch committed to trunk. Congratulation, Zhijie!
          Hide
          Daniel Dai added a comment -

          Change the patch slightly to fix test-patch warnings.

          Show
          Daniel Dai added a comment - Change the patch slightly to fix test-patch warnings.
          Hide
          Daniel Dai added a comment -

          Hi, Zhijie,
          Patch looks pretty good. The simple test case you use is fine, and local mode test should be good enough. I see you also tested cases when one side is an empty bag, it works as expected. Illustrate also works. I think the patch is good to go.

          One future improvement is, we can optimize the bag creation. We can create n-1 bags instead of n bags. For the last relation, we can iterate tuple by tuple, create results on the fly. Thus we can use less memory. We can open a separate Jira ticket for this optimization.

          Show
          Daniel Dai added a comment - Hi, Zhijie, Patch looks pretty good. The simple test case you use is fine, and local mode test should be good enough. I see you also tested cases when one side is an empty bag, it works as expected. Illustrate also works. I think the patch is good to go. One future improvement is, we can optimize the bag creation. We can create n-1 bags instead of n bags. For the last relation, we can iterate tuple by tuple, create results on the fly. Thus we can use less memory. We can open a separate Jira ticket for this optimization.
          Hide
          Zhijie Shen added a comment -

          Hi Daniel,

          I've completed the test cases. The slight difference between my patch and what you suppose to be is "1. Simple case (the one on Jira)". I didn't use this sample directly because it contains a command that hasn't been implemented yet. Instead, I modified the sample and created the simplest case:
          C = cogroup user by uid, session by uid;
          D = foreach C

          { crossed = cross user, session; generate crossed; }

          One more question is whether we need to do the same unit test in a mini-cluster environment (currently in local environment).

          Generally, I think the patch for this issue is close to submission. How do you think about this?

          Show
          Zhijie Shen added a comment - Hi Daniel, I've completed the test cases. The slight difference between my patch and what you suppose to be is "1. Simple case (the one on Jira)". I didn't use this sample directly because it contains a command that hasn't been implemented yet. Instead, I modified the sample and created the simplest case: C = cogroup user by uid, session by uid; D = foreach C { crossed = cross user, session; generate crossed; } One more question is whether we need to do the same unit test in a mini-cluster environment (currently in local environment). Generally, I think the patch for this issue is close to submission. How do you think about this?
          Hide
          Zhijie Shen added a comment -

          The candidate patch for submission.

          Show
          Zhijie Shen added a comment - The candidate patch for submission.
          Hide
          Zhijie Shen added a comment -

          Hi Daniel,

          Yes, I'm currently preparing the test cases. However, instead of writing a separate test suit, may I added the cases into TestForEachNestedPlanLocal and TestForEachNestedPlan, which already contains the cases for other nested operators.

          Show
          Zhijie Shen added a comment - Hi Daniel, Yes, I'm currently preparing the test cases. However, instead of writing a separate test suit, may I added the cases into TestForEachNestedPlanLocal and TestForEachNestedPlan, which already contains the cases for other nested operators.
          Hide
          Daniel Dai added a comment -

          Hi, Zhijie,
          I browse through the patch, looks good to me. Certainly I will look into more details later. But in the mean time, can you add test cases to the patch? We need to test:
          1. Simple case (the one on Jira)
          2. More than two inputs in cogroup
          3. There is statement before cross (eg, do a filter, then do cross)

          You can follow existing test (eg: TestEvalPipeline2) to write a separate test suit.

          Show
          Daniel Dai added a comment - Hi, Zhijie, I browse through the patch, looks good to me. Certainly I will look into more details later. But in the mean time, can you add test cases to the patch? We need to test: 1. Simple case (the one on Jira) 2. More than two inputs in cogroup 3. There is statement before cross (eg, do a filter, then do cross) You can follow existing test (eg: TestEvalPipeline2) to write a separate test suit.
          Hide
          Zhijie Shen added a comment -

          The nested cross functionality basically works. I've respectively tested one, two and three bags cross on both local machine and single-node hadoop platform. Also, I've fixed the bug of cross over null bag. If an input bag is found null, POCross return null result whatever the other inputs are, because the cross product of null and anything is null. Moreover, I refactored POCross a bit and implmented illustratorMarkup() function, which is the part I'm still not sure.

          Afterwards, I'll come up with comprehensive test cases, and investegate into illustratorMarkup().

          Below is an example. For the following sample commands,

          #-----------------------------------------------
          user = load 'user.txt' using PigStorage('\t') as (uid, region);
          session = load 'session.txt' using PigStorage('\t') as (uid, region, duration);
          C = cogroup user by uid, session by uid;
          D = foreach C

          { crossed = cross user, session; generate crossed; }

          store D into 'test.out';
          #-----------------------------------------------

          I got the logical, phyical and map/reduce plans, which are demonstrated as follows.

          #-----------------------------------------------

          1. New Logical Plan:
            #-----------------------------------------------
            D: (Name: LOStore Schema: crossed#22:bag {#23:tuple(user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)}

            )

            ---D: (Name: LOForEach Schema: crossed#22:bag {#23:tuple(user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)}

            )

             
            (Name: LOGenerate[false] Schema: crossed#22:bag {#23:tuple(user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)}

            )

               
              crossed:(Name: Project Type: bag Uid: 22 Input: 0 Column: )
             
              ---crossed: (Name: LOCross Schema: user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)
             
              ---user: (Name: LOInnerLoad[1] Schema: uid#16:bytearray,region#17:bytearray)
             
              ---session: (Name: LOInnerLoad[2] Schema: uid#18:bytearray,region#19:bytearray,duration#20:bytearray)
            ---C: (Name: LOCogroup Schema: group#21:bytearray,user#22:bag {#28:tuple(uid#16:bytearray,region#17:bytearray)}

            ,session#24:bag

            {#29:tuple(uid#18:bytearray,region#19:bytearray,duration#20:bytearray)}

            )

             
            uid:(Name: Project Type: bytearray Uid: 16 Input: 0 Column: 0)
             
            uid:(Name: Project Type: bytearray Uid: 18 Input: 1 Column: 0)
            ---user: (Name: LOLoad Schema: uid#16:bytearray,region#17:bytearray)RequiredFields:null
            ---session: (Name: LOLoad Schema: uid#18:bytearray,region#19:bytearray,duration#20:bytearray)RequiredFields:null

          #-----------------------------------------------

          1. Physical Plan:
            #-----------------------------------------------
            D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-25
            ---D: New For Each(false)[bag] - scope-24
             
            RelationToExpressionProject[bag][*] - scope-20
             
              ---POCross[bag] - scope-23
             
              ---Project[bag][1] - scope-21
             
              ---Project[bag][2] - scope-22
            ---C: Package[tuple] {bytearray} - scope-15
            |
            |---C: Global Rearrange[tuple] - scope-14
            |
            |---C: Local Rearrange[tuple]{bytearray}

            (false) - scope-16

               
              Project[bytearray][0] - scope-17
             
              ---user: New For Each(false,false)[bag] - scope-5
               
              Project[bytearray][0] - scope-1
               
              Project[bytearray][1] - scope-3
             
              ---user: Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:PigStorage(' ')) - scope-0
            ---C: Local Rearrange[tuple] {bytearray}(false) - scope-18
            | |
            | Project[bytearray][0] - scope-19
            |
            |---session: New For Each(false,false,false)[bag] - scope-13
            | |
            | Project[bytearray][0] - scope-7
            | |
            | Project[bytearray][1] - scope-9
            | |
            | Project[bytearray][2] - scope-11
            |
            |---session: Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:PigStorage(' ')) - scope-6

            #--------------------------------------------------
            # Map Reduce Plan
            #--------------------------------------------------
            MapReduce node scope-28
            Map Plan
            Union[tuple] - scope-29
            |
            |---C: Local Rearrange[tuple]{bytearray}

            (false) - scope-16

               
              Project[bytearray][0] - scope-17
             
              ---user: New For Each(false,false)[bag] - scope-5
               
              Project[bytearray][0] - scope-1
               
              Project[bytearray][1] - scope-3
             
              ---user: Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:PigStorage(' ')) - scope-0
            ---C: Local Rearrange[tuple] {bytearray}(false) - scope-18
            | |
            | Project[bytearray][0] - scope-19
            |
            |---session: New For Each(false,false,false)[bag] - scope-13
            | |
            | Project[bytearray][0] - scope-7
            | |
            | Project[bytearray][1] - scope-9
            | |
            | Project[bytearray][2] - scope-11
            |
            |---session: Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:PigStorage(' ')) - scope-6--------
            Reduce Plan
            D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-25
            |
            |---D: New For Each(false)[bag] - scope-24
            | |
            | RelationToExpressionProject[bag][*] - scope-20
            | |
            | |---POCross[bag] - scope-23
            | |
            | |---Project[bag][1] - scope-21
            | |
            | |---Project[bag][2] - scope-22
            |
            |---C: Package[tuple]{bytearray}

            - scope-15--------
            Global sort: false
            ----------------

          Show
          Zhijie Shen added a comment - The nested cross functionality basically works. I've respectively tested one, two and three bags cross on both local machine and single-node hadoop platform. Also, I've fixed the bug of cross over null bag. If an input bag is found null, POCross return null result whatever the other inputs are, because the cross product of null and anything is null. Moreover, I refactored POCross a bit and implmented illustratorMarkup() function, which is the part I'm still not sure. Afterwards, I'll come up with comprehensive test cases, and investegate into illustratorMarkup(). Below is an example. For the following sample commands, #----------------------------------------------- user = load 'user.txt' using PigStorage('\t') as (uid, region); session = load 'session.txt' using PigStorage('\t') as (uid, region, duration); C = cogroup user by uid, session by uid; D = foreach C { crossed = cross user, session; generate crossed; } store D into 'test.out'; #----------------------------------------------- I got the logical, phyical and map/reduce plans, which are demonstrated as follows. #----------------------------------------------- New Logical Plan: #----------------------------------------------- D: (Name: LOStore Schema: crossed#22:bag {#23:tuple(user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)} ) ---D: (Name: LOForEach Schema: crossed#22:bag {#23:tuple(user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)} )   (Name: LOGenerate [false] Schema: crossed#22:bag {#23:tuple(user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)} )       crossed:(Name: Project Type: bag Uid: 22 Input: 0 Column: )     ---crossed: (Name: LOCross Schema: user::uid#16:bytearray,user::region#17:bytearray,session::uid#18:bytearray,session::region#19:bytearray,session::duration#20:bytearray)     ---user: (Name: LOInnerLoad [1] Schema: uid#16:bytearray,region#17:bytearray)     ---session: (Name: LOInnerLoad [2] Schema: uid#18:bytearray,region#19:bytearray,duration#20:bytearray) ---C: (Name: LOCogroup Schema: group#21:bytearray,user#22:bag {#28:tuple(uid#16:bytearray,region#17:bytearray)} ,session#24:bag {#29:tuple(uid#18:bytearray,region#19:bytearray,duration#20:bytearray)} )   uid:(Name: Project Type: bytearray Uid: 16 Input: 0 Column: 0)   uid:(Name: Project Type: bytearray Uid: 18 Input: 1 Column: 0) ---user: (Name: LOLoad Schema: uid#16:bytearray,region#17:bytearray)RequiredFields:null ---session: (Name: LOLoad Schema: uid#18:bytearray,region#19:bytearray,duration#20:bytearray)RequiredFields:null #----------------------------------------------- Physical Plan: #----------------------------------------------- D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-25 ---D: New For Each(false) [bag] - scope-24   RelationToExpressionProject [bag] [*] - scope-20     ---POCross [bag] - scope-23     ---Project [bag] [1] - scope-21     ---Project [bag] [2] - scope-22 ---C: Package [tuple] {bytearray} - scope-15 | |---C: Global Rearrange [tuple] - scope-14 | |---C: Local Rearrange [tuple] {bytearray} (false) - scope-16       Project [bytearray] [0] - scope-17     ---user: New For Each(false,false) [bag] - scope-5       Project [bytearray] [0] - scope-1       Project [bytearray] [1] - scope-3     ---user: Load( file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:PigStorage( ' ')) - scope-0 ---C: Local Rearrange [tuple] {bytearray}(false) - scope-18 | | | Project [bytearray] [0] - scope-19 | |---session: New For Each(false,false,false) [bag] - scope-13 | | | Project [bytearray] [0] - scope-7 | | | Project [bytearray] [1] - scope-9 | | | Project [bytearray] [2] - scope-11 | |---session: Load( file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:PigStorage( ' ')) - scope-6 #-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-28 Map Plan Union [tuple] - scope-29 | |---C: Local Rearrange [tuple] {bytearray} (false) - scope-16       Project [bytearray] [0] - scope-17     ---user: New For Each(false,false) [bag] - scope-5       Project [bytearray] [0] - scope-1       Project [bytearray] [1] - scope-3     ---user: Load( file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:PigStorage( ' ')) - scope-0 ---C: Local Rearrange [tuple] {bytearray}(false) - scope-18 | | | Project [bytearray] [0] - scope-19 | |---session: New For Each(false,false,false) [bag] - scope-13 | | | Project [bytearray] [0] - scope-7 | | | Project [bytearray] [1] - scope-9 | | | Project [bytearray] [2] - scope-11 | |---session: Load( file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:PigStorage( ' ')) - scope-6-------- Reduce Plan D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-25 | |---D: New For Each(false) [bag] - scope-24 | | | RelationToExpressionProject [bag] [*] - scope-20 | | | |---POCross [bag] - scope-23 | | | |---Project [bag] [1] - scope-21 | | | |---Project [bag] [2] - scope-22 | |---C: Package [tuple] {bytearray} - scope-15-------- Global sort: false ----------------
          Hide
          Zhijie Shen added a comment -

          As Daniel suggested, I differ the process of visiting LOCross: for the top-level LOCross, I keep the original logic; for the nested LOCross, I generate a POCross instead and place it into the correct place of the physical plan. After that, I've found the physical and map/reduce plan is generated correctly. However, the pipeline was still broken because the getNext() function of POCross hasn't been implemented. I recall that you've mentioned there's POCross in previous version of Pig. I found out the code and reuse the getNext() function. I found it almost worked! There were some bugs, which were fixed by me. Finally, the nested cross basically works! Please refer to the newest patch for the code update.

          Show
          Zhijie Shen added a comment - As Daniel suggested, I differ the process of visiting LOCross: for the top-level LOCross, I keep the original logic; for the nested LOCross, I generate a POCross instead and place it into the correct place of the physical plan. After that, I've found the physical and map/reduce plan is generated correctly. However, the pipeline was still broken because the getNext() function of POCross hasn't been implemented. I recall that you've mentioned there's POCross in previous version of Pig. I found out the code and reuse the getNext() function. I found it almost worked! There were some bugs, which were fixed by me. Finally, the nested cross basically works! Please refer to the newest patch for the code update.
          Hide
          Daniel Dai added a comment -

          Here is my review comments:
          1. Parser change is good. LOCross is correct in logical plan
          2. Logical plan is still good after the optimizer
          3. Physical plan is not good. we shall have POCross in physical plan. The current physical plan shows "Global Rearrange" + "Local Rearrange" + "GFCross", which is good for top level cross, but not nested cross

          Here is suggested change:
          1. Add a flag inner to LOCross
          2. Change LogToPhyTranslationVisitor, if top level cross, keep the old logic; if nested cross, convert LOCross to POCross

          Show
          Daniel Dai added a comment - Here is my review comments: 1. Parser change is good. LOCross is correct in logical plan 2. Logical plan is still good after the optimizer 3. Physical plan is not good. we shall have POCross in physical plan. The current physical plan shows "Global Rearrange" + "Local Rearrange" + "GFCross", which is good for top level cross, but not nested cross Here is suggested change: 1. Add a flag inner to LOCross 2. Change LogToPhyTranslationVisitor, if top level cross, keep the old logic; if nested cross, convert LOCross to POCross
          Hide
          Zhijie Shen added a comment -

          Incomplete patch to show progress.

          Show
          Zhijie Shen added a comment - Incomplete patch to show progress.
          Hide
          Zhijie Shen added a comment -

          Make a report of my progress and the issue I've met.

          First, by adding the nested cross syntax (e.g., changing grammar source files and the hooked functions), Pig can now accept the nested cross statement, generate the logic plan and reach the step of translating the logic plan into the physical plan. The major issue here is multiple inputs. The existing nested operators only have one input, hence current grammar seems to assume one input for the nested operators. However, cross can accept multiple inputs. Therefore the input syntax in the nested block has to been changed as well.

          How to translate the logic nested cross into the correct physical operators looks like the essence part of this project. I've spent time to investigating into the translation, as well as the map/reduce plan compiling and map/reduce job execution.

          During the stage of logic plan generation, both top-level and nested cross statements will result in a LOCross instance. LogToPhyTranslationVisitor already has the function to visit POCross. However, the fuction works with the top-level POCross, but not the nested one. To see why it doesn't work for nested cross, here are two map/reduce plans respectively generated for the top-level and the nested cross operators:

          1. top-level:
          user = load 'user.txt' as (uid, region);
          session = load 'session.txt' as (uid, region, duration);
          A = group user by uid;
          B = group session by uid;
          C = cross A, B;
          store C into 'test.out';

          #--------------------------------------------------

          1. Map Reduce Plan
            #--------------------------------------------------
            MapReduce node scope-38
            Map Plan
            Union[tuple] - scope-39
            ---C: Local Rearrange[tuple] {tuple}(false) - scope-21
            | | |
            | | Project[int][0] - scope-22
            | | |
            | | Project[int][1] - scope-23
            | |
            | |---C: New For Each(true,true)[tuple] - scope-20
            | | |
            | | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-18
            | | |
            | | |---Constant(2) - scope-16
            | | |
            | | |---Constant(0) - scope-17
            | | |
            | | Project[tuple][*] - scope-19
            | |
            | |---user: New For Each(false,false)[bag] - scope-5
            | | |
            | | Project[bytearray][0] - scope-1
            | | |
            | | Project[bytearray][1] - scope-3
            | |
            | |---user: Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage) - scope-0
            |
            |---C: Local Rearrange[tuple]{tuple}

            (false) - scope-29

             
            Project[int][0] - scope-30
             
            Project[int][1] - scope-31
            ---C: New For Each(true,true)[tuple] - scope-28
             
            POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
             
              ---Constant(2) - scope-24
             
              ---Constant(1) - scope-25
             
            Project[tuple][*] - scope-27
            ---session: New For Each(false,false,false)[bag] - scope-13
             
            Project[bytearray][0] - scope-7
             
            Project[bytearray][1] - scope-9
             
            Project[bytearray][2] - scope-11
            --session: Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage) - scope-6-------
            Reduce Plan
            C: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-35
            --POJoinPackage(true,true)[tuple] - scope-40-------
            Global sort: false
            ----------------

          2. nested:
          user = load 'user.txt' as (uid, region);
          session = load 'session.txt' as (uid, region, duration);
          C = cogroup user by uid, session by uid;
          D = foreach C

          { crossed = cross user, session; filtered = filter crossed by user::region == session::region; generate group, user, session; }

          ;
          store D into 'test.out';

          #--------------------------------------------------

          1. Map Reduce Plan
            #--------------------------------------------------
            MapReduce node scope-52
            Map Plan
            Union[tuple] - scope-53
            ---C: Local Rearrange[tuple] {bytearray}(false) - scope-16
            | | |
            | | Project[bytearray][0] - scope-17
            | |
            | |---user: New For Each(false,false)[bag] - scope-5
            | | |
            | | Project[bytearray][0] - scope-1
            | | |
            | | Project[bytearray][1] - scope-3
            | |
            | |---user: Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage) - scope-0
            |
            |---C: Local Rearrange[tuple]{bytearray}

            (false) - scope-18

             
            Project[bytearray][0] - scope-19
            ---session: New For Each(false,false,false)[bag] - scope-13
             
            Project[bytearray][0] - scope-7
             
            Project[bytearray][1] - scope-9
             
            Project[bytearray][2] - scope-11
            --session: Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage) - scope-6-------
            Reduce Plan
            D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-49
            ---D: New For Each(false)[bag] - scope-48
             
            RelationToExpressionProject[bag][*] - scope-20
             
              ---filtered: Filter[bag] - scope-44
               
              Equal To[boolean] - scope-47
               
                ---Project[bytearray][1] - scope-45
               
                ---Project[bytearray][3] - scope-46
             
              ---crossed: New For Each(true,true)[tuple] - scope-43
               
              Project[bag][1] - scope-41
               
              Project[bag][2] - scope-42
             
              ---Package[tuple] {tuple} - scope-24
            | |
            | |---crossed: Global Rearrange[tuple] - scope-23
            | |
            | |---crossed: Local Rearrange[tuple]{tuple}

            (false) - scope-30

                 
                Project[int][0] - scope-31
                 
                Project[int][1] - scope-32
               
                ---crossed: New For Each(true,true)[tuple] - scope-29
                 
                POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-27
                 
                  ---Constant(2) - scope-25
                 
                  ---Constant(0) - scope-26
                 
                Project[tuple][*] - scope-28
               
                ---Project[bag][1] - scope-21
             
              ---crossed: Local Rearrange[tuple] {tuple}

            (false) - scope-38

               
              Project[int][0] - scope-39
               
              Project[int][1] - scope-40
             
              ---crossed: New For Each(true,true)[tuple] - scope-37
               
              POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-35
               
                ---Constant(2) - scope-33
               
                ---Constant(1) - scope-34
               
              Project[tuple][*] - scope-36
             
              ---Project[bag][2] - scope-22
            ---C: Package[tuple] {bytearray}

            - scope-15--------
            Global sort: false
            ----------------

          The difference in the physical plan is not obvious because translation procedure is same: local rearrange first -> global rearrange -> package -> physical foreach. However, when the physical plan is translated into the map/reduce plan, the difference becomes obvious. The physical operators belonging to the top-level cross are distributed in both map and reduce stage: local rearrange is placed in map; global rearrange is removed because the logic is inherently available in map/reduce; package and foreach group together to form POJoinPackage which is placed in the reduce stage.

          On the other hand, the physical plan of the nested cross hasn't be translated into the same map/reduce plan. This is because the physical operators of the nested commands cannot be distributed into both map and reduce stages. Instead, they have to be solved locally in one of either map/reduce stage. Therefore, the problem occurs, the global rearrange and the package operators appear inside the reduce stage. First, the global rearrange used to be assumed to be replaced by Hadoop's merge/shuffle, so that the logic of processing data is not implemented. Second, package reads data from Hadoop's reduce function. And in the current implementation, only one package can appear in the reduce stage, because the keyInfo member of only one POPackage instance will be set.

          As to these situations, when visiting LOCross in LogToPhyTranslationVisitor, whether it is a top-level or a nested cross should be distinguished (perhaps need modification on logical plan). If it is a nested cross, some other operator (temporarily named POCross) need to be generated to replace POGlobalRearrange and POPackage. It should achieve the similar functionality, but can operate locally. Afterwards, MRCompiler needs a function to visit the POCross instance and attach it to the reduce stage.

          Attached is the code I've modified up till now.

          Show
          Zhijie Shen added a comment - Make a report of my progress and the issue I've met. First, by adding the nested cross syntax (e.g., changing grammar source files and the hooked functions), Pig can now accept the nested cross statement, generate the logic plan and reach the step of translating the logic plan into the physical plan. The major issue here is multiple inputs. The existing nested operators only have one input, hence current grammar seems to assume one input for the nested operators. However, cross can accept multiple inputs. Therefore the input syntax in the nested block has to been changed as well. How to translate the logic nested cross into the correct physical operators looks like the essence part of this project. I've spent time to investigating into the translation, as well as the map/reduce plan compiling and map/reduce job execution. During the stage of logic plan generation, both top-level and nested cross statements will result in a LOCross instance. LogToPhyTranslationVisitor already has the function to visit POCross. However, the fuction works with the top-level POCross, but not the nested one. To see why it doesn't work for nested cross, here are two map/reduce plans respectively generated for the top-level and the nested cross operators: 1. top-level: user = load 'user.txt' as (uid, region); session = load 'session.txt' as (uid, region, duration); A = group user by uid; B = group session by uid; C = cross A, B; store C into 'test.out'; #-------------------------------------------------- Map Reduce Plan #-------------------------------------------------- MapReduce node scope-38 Map Plan Union [tuple] - scope-39 ---C: Local Rearrange [tuple] {tuple}(false) - scope-21 | | | | | Project [int] [0] - scope-22 | | | | | Project [int] [1] - scope-23 | | | |---C: New For Each(true,true) [tuple] - scope-20 | | | | | POUserFunc(org.apache.pig.impl.builtin.GFCross) [bag] - scope-18 | | | | | |---Constant(2) - scope-16 | | | | | |---Constant(0) - scope-17 | | | | | Project [tuple] [*] - scope-19 | | | |---user: New For Each(false,false) [bag] - scope-5 | | | | | Project [bytearray] [0] - scope-1 | | | | | Project [bytearray] [1] - scope-3 | | | |---user: Load( file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage ) - scope-0 | |---C: Local Rearrange [tuple] {tuple} (false) - scope-29   Project [int] [0] - scope-30   Project [int] [1] - scope-31 ---C: New For Each(true,true) [tuple] - scope-28   POUserFunc(org.apache.pig.impl.builtin.GFCross) [bag] - scope-26     ---Constant(2) - scope-24     ---Constant(1) - scope-25   Project [tuple] [*] - scope-27 ---session: New For Each(false,false,false) [bag] - scope-13   Project [bytearray] [0] - scope-7   Project [bytearray] [1] - scope-9   Project [bytearray] [2] - scope-11 -- session: Load( file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage ) - scope-6 ------- Reduce Plan C: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-35 -- POJoinPackage(true,true) [tuple] - scope-40 ------- Global sort: false ---------------- 2. nested: user = load 'user.txt' as (uid, region); session = load 'session.txt' as (uid, region, duration); C = cogroup user by uid, session by uid; D = foreach C { crossed = cross user, session; filtered = filter crossed by user::region == session::region; generate group, user, session; } ; store D into 'test.out'; #-------------------------------------------------- Map Reduce Plan #-------------------------------------------------- MapReduce node scope-52 Map Plan Union [tuple] - scope-53 ---C: Local Rearrange [tuple] {bytearray}(false) - scope-16 | | | | | Project [bytearray] [0] - scope-17 | | | |---user: New For Each(false,false) [bag] - scope-5 | | | | | Project [bytearray] [0] - scope-1 | | | | | Project [bytearray] [1] - scope-3 | | | |---user: Load( file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage ) - scope-0 | |---C: Local Rearrange [tuple] {bytearray} (false) - scope-18   Project [bytearray] [0] - scope-19 ---session: New For Each(false,false,false) [bag] - scope-13   Project [bytearray] [0] - scope-7   Project [bytearray] [1] - scope-9   Project [bytearray] [2] - scope-11 -- session: Load( file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage ) - scope-6 ------- Reduce Plan D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-49 ---D: New For Each(false) [bag] - scope-48   RelationToExpressionProject [bag] [*] - scope-20     ---filtered: Filter [bag] - scope-44       Equal To [boolean] - scope-47         ---Project [bytearray] [1] - scope-45         ---Project [bytearray] [3] - scope-46     ---crossed: New For Each(true,true) [tuple] - scope-43       Project [bag] [1] - scope-41       Project [bag] [2] - scope-42     ---Package [tuple] {tuple} - scope-24 | | | |---crossed: Global Rearrange [tuple] - scope-23 | | | |---crossed: Local Rearrange [tuple] {tuple} (false) - scope-30           Project [int] [0] - scope-31           Project [int] [1] - scope-32         ---crossed: New For Each(true,true) [tuple] - scope-29           POUserFunc(org.apache.pig.impl.builtin.GFCross) [bag] - scope-27             ---Constant(2) - scope-25             ---Constant(0) - scope-26           Project [tuple] [*] - scope-28         ---Project [bag] [1] - scope-21     ---crossed: Local Rearrange [tuple] {tuple} (false) - scope-38       Project [int] [0] - scope-39       Project [int] [1] - scope-40     ---crossed: New For Each(true,true) [tuple] - scope-37       POUserFunc(org.apache.pig.impl.builtin.GFCross) [bag] - scope-35         ---Constant(2) - scope-33         ---Constant(1) - scope-34       Project [tuple] [*] - scope-36     ---Project [bag] [2] - scope-22 ---C: Package [tuple] {bytearray} - scope-15-------- Global sort: false ---------------- The difference in the physical plan is not obvious because translation procedure is same: local rearrange first -> global rearrange -> package -> physical foreach. However, when the physical plan is translated into the map/reduce plan, the difference becomes obvious. The physical operators belonging to the top-level cross are distributed in both map and reduce stage: local rearrange is placed in map; global rearrange is removed because the logic is inherently available in map/reduce; package and foreach group together to form POJoinPackage which is placed in the reduce stage. On the other hand, the physical plan of the nested cross hasn't be translated into the same map/reduce plan. This is because the physical operators of the nested commands cannot be distributed into both map and reduce stages. Instead, they have to be solved locally in one of either map/reduce stage. Therefore, the problem occurs, the global rearrange and the package operators appear inside the reduce stage. First, the global rearrange used to be assumed to be replaced by Hadoop's merge/shuffle, so that the logic of processing data is not implemented. Second, package reads data from Hadoop's reduce function. And in the current implementation, only one package can appear in the reduce stage, because the keyInfo member of only one POPackage instance will be set. As to these situations, when visiting LOCross in LogToPhyTranslationVisitor, whether it is a top-level or a nested cross should be distinguished (perhaps need modification on logical plan). If it is a nested cross, some other operator (temporarily named POCross) need to be generated to replace POGlobalRearrange and POPackage. It should achieve the similar functionality, but can operate locally. Afterwards, MRCompiler needs a function to visit the POCross instance and attach it to the reduce stage. Attached is the code I've modified up till now.
          Hide
          Zhijie Shen added a comment -

          I've updated my application.

          Show
          Zhijie Shen added a comment - I've updated my application.
          Hide
          Zhijie Shen added a comment -

          Here is the draft of my GSoC'11 application for the this issue:

          http://www.google-melange.com/gsoc/proposal/review/google/gsoc2011/zjshen/1

          I'll go on to polish it. Comments are welcome. Thanks!

          Show
          Zhijie Shen added a comment - Here is the draft of my GSoC'11 application for the this issue: http://www.google-melange.com/gsoc/proposal/review/google/gsoc2011/zjshen/1 I'll go on to polish it. Comments are welcome. Thanks!
          Hide
          Daniel Dai added a comment -

          In 0.6, local mode makes a big difference. Starting 0.7, local mode leverage hadoop local mode, there is no much difference.

          Show
          Daniel Dai added a comment - In 0.6, local mode makes a big difference. Starting 0.7, local mode leverage hadoop local mode, there is no much difference.
          Hide
          Zhijie Shen added a comment -

          I found the POCross.java in branch-0.6. Together with POCross, other classes in "org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperator" have been removed as well in the latest version. I checked into "MRCompiler.java", and found the "pigContext.getExecType()" is seldom checked in the visitXXXX functions. So how does Pig distinguish between local execution and hadoop execution? I'm a bit confused here.

          Show
          Zhijie Shen added a comment - I found the POCross.java in branch-0.6. Together with POCross, other classes in "org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperator" have been removed as well in the latest version. I checked into "MRCompiler.java", and found the "pigContext.getExecType()" is seldom checked in the visitXXXX functions. So how does Pig distinguish between local execution and hadoop execution? I'm a bit confused here.
          Hide
          Daniel Dai added a comment -

          Yes, you are right. In 0.8 branch, we have POCross, but we drop in trunk. And that POCross is for another purpose, may or may not be usable for this project. We need a new POCross to implement nested cross behavior.

          Show
          Daniel Dai added a comment - Yes, you are right. In 0.8 branch, we have POCross, but we drop in trunk. And that POCross is for another purpose, may or may not be usable for this project. We need a new POCross to implement nested cross behavior.
          Hide
          Zhijie Shen added a comment -

          Hi Daniel,

          I browsed through the code, but I didn't found a class named POCross. I think it's not only depreciated, but also removed from the code. Moreover, after reading the code, I'm still not clear why you said the top level logic cross should be translated into GFCross while the nested logic cross should use POCross instead? What is the difference?

          Regards,
          Zhijie

          Show
          Zhijie Shen added a comment - Hi Daniel, I browsed through the code, but I didn't found a class named POCross. I think it's not only depreciated, but also removed from the code. Moreover, after reading the code, I'm still not clear why you said the top level logic cross should be translated into GFCross while the nested logic cross should use POCross instead? What is the difference? Regards, Zhijie
          Hide
          Daniel Dai added a comment -

          GSoC Wiki page of Pig does provide guidelines how to write a GSoC application. There is no absolute criteria. The chance of acceptance depends on how many students applying, how much quota we get, and what is your rank among all students.

          I assume I will mentor this issue unless we get overwhelming number of projects, or someone is more interested to mentor.

          Show
          Daniel Dai added a comment - GSoC Wiki page of Pig does provide guidelines how to write a GSoC application. There is no absolute criteria. The chance of acceptance depends on how many students applying, how much quota we get, and what is your rank among all students. I assume I will mentor this issue unless we get overwhelming number of projects, or someone is more interested to mentor.
          Hide
          Zhijie Shen added a comment -

          Hi Daniel,

          thank you very much for your response. It will definitely help me to understand the code. Besides, would you please to give some tips to survive in GSoC application with ASF? For example, the criteria to evaluate a student. The GSoC Wiki page of Pig provides quite limited information.

          BTW, I assume you would be the potential mentor for this issue, right?

          Regards,
          Zhijie

          Show
          Zhijie Shen added a comment - Hi Daniel, thank you very much for your response. It will definitely help me to understand the code. Besides, would you please to give some tips to survive in GSoC application with ASF? For example, the criteria to evaluate a student. The GSoC Wiki page of Pig provides quite limited information. BTW, I assume you would be the potential mentor for this issue, right? Regards, Zhijie
          Hide
          Daniel Dai added a comment -

          Yes, you will need to change LogicalPlanGenerator.g and buildNestedCrossOp so that Pig can recognize nested operator and generate logical plan correctly. This is the first step. There are other places need to change, for example:
          1. LogToPhyTranslationVisitor.java, which translate logical plan to physical plan. It currently does not recognize nested cross
          2. Also in LogToPhyTranslationVisitor.java, Pig translate top level "cross" into UDF GFCross. Nested cross will use a different implementation, so we should translate into a different physical plan (using POCross)
          3. MRCompiler.java, it does not know how to handle POCross yet
          4. Pipeline execution, I hope POCross will work, but need to review (a little background, POCross used to implement Pig local mode, however, it is dropped in Pig 0.7 since we move to hadoop local mode. Currently no one is using POCross, hopefully it still functional)

          Show
          Daniel Dai added a comment - Yes, you will need to change LogicalPlanGenerator.g and buildNestedCrossOp so that Pig can recognize nested operator and generate logical plan correctly. This is the first step. There are other places need to change, for example: 1. LogToPhyTranslationVisitor.java, which translate logical plan to physical plan. It currently does not recognize nested cross 2. Also in LogToPhyTranslationVisitor.java, Pig translate top level "cross" into UDF GFCross. Nested cross will use a different implementation, so we should translate into a different physical plan (using POCross) 3. MRCompiler.java, it does not know how to handle POCross yet 4. Pipeline execution, I hope POCross will work, but need to review (a little background, POCross used to implement Pig local mode, however, it is dropped in Pig 0.7 since we move to hadoop local mode. Currently no one is using POCross, hopefully it still functional)
          Hide
          Zhijie Shen added a comment -

          I've just had quick glance at the code, and found the start point of this issue could be the LogicalPlanGenerator.g, where current grammar doesn't support accept the nested cross operator. Meanwhile, the LogicalPlanBuilder class lacks buildNestedCrossOp function as well. Hence to add nested cross, at first, additional codes are required in the aforementioned part of the Pig. Please correct me if I'm wrong. Thanks! I'm going on to explore more related parts of this issue.

          Show
          Zhijie Shen added a comment - I've just had quick glance at the code, and found the start point of this issue could be the LogicalPlanGenerator.g, where current grammar doesn't support accept the nested cross operator. Meanwhile, the LogicalPlanBuilder class lacks buildNestedCrossOp function as well. Hence to add nested cross, at first, additional codes are required in the aforementioned part of the Pig. Please correct me if I'm wrong. Thanks! I'm going on to explore more related parts of this issue.
          Hide
          Zhijie Shen added a comment -

          Hi developers,

          I'm a graduate student and interested in big data. I'm interested in applying for GSoC with Pig. I've already successfully built Pig locally, and read two papers about Pig published on SIGMOD and VLDB. Now I'm investigating into this issue and the related one: 1631, which seem to be interesting additions to Pig. However, there's quite limited information here. Will anybody give me some hints of where I should start in Pig code to deal with this issue?

          Show
          Zhijie Shen added a comment - Hi developers, I'm a graduate student and interested in big data. I'm interested in applying for GSoC with Pig. I've already successfully built Pig locally, and read two papers about Pig published on SIGMOD and VLDB. Now I'm investigating into this issue and the related one: 1631, which seem to be interesting additions to Pig. However, there's quite limited information here. Will anybody give me some hints of where I should start in Pig code to deal with this issue?

            People

            • Assignee:
              Zhijie Shen
              Reporter:
              Daniel Dai
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development