Pig
  1. Pig
  2. PIG-3347

Store invocation brings side effect

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.11
    • Fix Version/s: 0.12.1
    • Component/s: grunt
    • Labels:
      None
    • Environment:

      local mode

    • Hadoop Flags:
      Reviewed

      Description

      The problem is that intermediate 'store' invocation "changes" the final store output. Looks like it brings some kind of side effect. We did use 'local' mode to run script
      here is the input data:
      1
      1
      Here is the script:

      a = load 'test';
      
      a_group = group a by $0;
      b = foreach a_group {
        a_distinct = distinct a.$0;
        generate group, a_distinct;
      }
      --store b into 'b';
      c = filter b by SIZE(a_distinct) == 1;
      store c into 'out';
      

      We expect output to be:
      1 1
      The output is empty file.

      Uncomment

      --store b into 'b';

      line and see the diffrence.
      Yuo would get expected output.

      1. PIG-3347-1.patch
        4 kB
        Daniel Dai
      2. PIG-3347-2-testonly.patch
        5 kB
        Koji Noguchi
      3. PIG-3347-3.patch
        7 kB
        Daniel Dai
      4. PIG-3347-4-testonly.patch
        6 kB
        Koji Noguchi
      5. PIG-3347-5.patch
        9 kB
        Daniel Dai

        Activity

        Hide
        Daniel Dai added a comment -

        Patch committed to trunk and 0.12 branch. Thanks Koji for review and test!

        Show
        Daniel Dai added a comment - Patch committed to trunk and 0.12 branch. Thanks Koji for review and test!
        Hide
        Koji Noguchi added a comment -

        It could but introduce a lot of complications. Currently only LOForEach/LOSplitOutput is dealing with dup-uid, otherwise it will sprawl to all operators and all optimizer rules.

        Thanks Daniel. This helps me understand why I always get confused on this. Maybe someday I can separate the two.

        As for your latest patch(PIG-3347-5.patch), it passed the unit tests(including mine) and e2e was fine also. I'm +1.

        Show
        Koji Noguchi added a comment - It could but introduce a lot of complications. Currently only LOForEach/LOSplitOutput is dealing with dup-uid, otherwise it will sprawl to all operators and all optimizer rules. Thanks Daniel. This helps me understand why I always get confused on this. Maybe someday I can separate the two. As for your latest patch( PIG-3347 -5.patch), it passed the unit tests(including mine) and e2e was fine also. I'm +1.
        Hide
        Daniel Dai added a comment -

        It could but introduce a lot of complications. Currently only LOForEach/LOSplitOutput is dealing with dup-uid, otherwise it will sprawl to all operators and all optimizer rules.

        Show
        Daniel Dai added a comment - It could but introduce a lot of complications. Currently only LOForEach/LOSplitOutput is dealing with dup-uid, otherwise it will sprawl to all operators and all optimizer rules.
        Hide
        Koji Noguchi added a comment -

        we will need to generate a new uid for col2 to avoid uid conflict (using a UDF IdentityColumn)

        Daniel, I think I understand how it is being used, but my confusion is: for the pure purpose of tracking column lineage, shouldn't the redundant uid inside the relation be allowed? Isn't the requirement of no-conflict-uid coming from using the same uid for ProjectionPatcher which serves a different purpose than the lineage tracking?

        Show
        Koji Noguchi added a comment - we will need to generate a new uid for col2 to avoid uid conflict (using a UDF IdentityColumn) Daniel, I think I understand how it is being used, but my confusion is: for the pure purpose of tracking column lineage, shouldn't the redundant uid inside the relation be allowed? Isn't the requirement of no-conflict-uid coming from using the same uid for ProjectionPatcher which serves a different purpose than the lineage tracking?
        Hide
        Daniel Dai added a comment -

        Koji Noguchi, in the "B = foreach A generate a as col1, a as col2; ", we will need to generate a new uid for col2 to avoid uid conflict (using a UDF IdentityColumn). The downside is this will break the lineage chain. The uid is mostly used in optimizer, there several holes when we use it for pure lineage. Optimizer rules is expected to live with these holes by skip optimize (eg, PushUpFilter is skip the foreach with UDF, which include IdentityColumn aiming to fix the uid conflict)

        Show
        Daniel Dai added a comment - Koji Noguchi , in the "B = foreach A generate a as col1, a as col2; ", we will need to generate a new uid for col2 to avoid uid conflict (using a UDF IdentityColumn). The downside is this will break the lineage chain. The uid is mostly used in optimizer, there several holes when we use it for pure lineage. Optimizer rules is expected to live with these holes by skip optimize (eg, PushUpFilter is skip the foreach with UDF, which include IdentityColumn aiming to fix the uid conflict)
        Hide
        Daniel Dai added a comment -

        Attach another patch which also address Koji's new case.

        Show
        Daniel Dai added a comment - Attach another patch which also address Koji's new case.
        Hide
        Koji Noguchi added a comment -

        UID is to track column lineage so in logical optimizer, so that we can freely move operate up and down, ProjectionPatcher will reposition the column according to uid

        I think part of my confusion comes from these two. UID is used for (1) tracking column lineage. (2) UID is also used for ProjectionPatcher to reposition therefore requiring UID to be unique within each relation.

        Because of (2), we're seeing new uid being created whenever column is referenced multiple times.
        Like
        A = load 'a.txt' as (a:int);
        B = foreach A generate a as col1, a as col2;

        This would create a schema like

        1-2: (Name: LOStore Schema: col1#1:int,col2#2:int)
        ...
            |---A: (Name: LOLoad Schema: a#1:int)RequiredFields:null
        

        So without traversing the lineage, I cannot connect 'col2' to original 'a'.
        However, optimizer like PushUpFilter&FilterAboveForeach seems to be using just UID to determine the field usages...

        But this is outside of this jira. I need to spend more time learning how the pig compiler works.

        Show
        Koji Noguchi added a comment - UID is to track column lineage so in logical optimizer, so that we can freely move operate up and down, ProjectionPatcher will reposition the column according to uid I think part of my confusion comes from these two. UID is used for (1) tracking column lineage. (2) UID is also used for ProjectionPatcher to reposition therefore requiring UID to be unique within each relation. Because of (2), we're seeing new uid being created whenever column is referenced multiple times. Like A = load 'a.txt' as (a:int); B = foreach A generate a as col1, a as col2; This would create a schema like 1-2: (Name: LOStore Schema: col1#1:int,col2#2:int) ... |---A: (Name: LOLoad Schema: a#1:int)RequiredFields:null So without traversing the lineage, I cannot connect 'col2' to original 'a'. However, optimizer like PushUpFilter&FilterAboveForeach seems to be using just UID to determine the field usages... But this is outside of this jira. I need to spend more time learning how the pig compiler works.
        Hide
        Koji Noguchi added a comment -

        Thanks Daniel Dai.
        Adding one more testcase that I believe should push the filter before foreach.
        This one succeeds without the patch but fails with the patch.

        Show
        Koji Noguchi added a comment - Thanks Daniel Dai . Adding one more testcase that I believe should push the filter before foreach. This one succeeds without the patch but fails with the patch.
        Hide
        Daniel Dai added a comment -

        All unit tests pass with the patch.

        Show
        Daniel Dai added a comment - All unit tests pass with the patch.
        Hide
        Daniel Dai added a comment -

        Thanks Koji Noguchi. I attached another patch include your test case, plus the fix for LOFilter/LOLimit, etc. Will run unit tests.

        Show
        Daniel Dai added a comment - Thanks Koji Noguchi . I attached another patch include your test case, plus the fix for LOFilter/LOLimit, etc. Will run unit tests.
        Hide
        Koji Noguchi added a comment -

        Daniel Dai, I think your testcase in the patch succeeds even without your patch since it's missing FilterAboveForeach optimization.

        Uploading a test-only patch with this change. Added 2 more test cases for nested limit and nested filter.

        Nested distinct and nested limit are fixed with your current patch. Nested filter still fails.

        Show
        Koji Noguchi added a comment - Daniel Dai , I think your testcase in the patch succeeds even without your patch since it's missing FilterAboveForeach optimization. Uploading a test-only patch with this change. Added 2 more test cases for nested limit and nested filter. Nested distinct and nested limit are fixed with your current patch. Nested filter still fails.
        Hide
        Daniel Dai added a comment -

        UID is to track column lineage so in logical optimizer, so that we can freely move operate up and down, ProjectionPatcher will reposition the column according to uid, even if the column get reordered. A new source of data should have a new UID, that's the case for nested LOForEach/LODistinct, since they are not directly derived from the previous operator, instead, it is a new field generated by the foreach.

        Show
        Daniel Dai added a comment - UID is to track column lineage so in logical optimizer, so that we can freely move operate up and down, ProjectionPatcher will reposition the column according to uid, even if the column get reordered. A new source of data should have a new UID, that's the case for nested LOForEach/LODistinct, since they are not directly derived from the previous operator, instead, it is a new field generated by the foreach.
        Hide
        Julien Le Dem added a comment -

        I thought that the field UIDs were used to track lineage across the plan.
        Aniket Mokashi correct me if I'm wrong but it is used to determine which fields are reads for projection push down.
        In the case of self join (directly or indirectly) we end up with duplicate ids in the same relation because the same field is derived to 2 different fields.
        Otherwise I'm as lost as Koji Noguchi regarding the actual mechanisms around the UID.
        I tried to fix some of these in the past (PIG-3020) but it appears they created more problems (PIG-3492)
        Daniel Dai maybe you can enlighten us?

        Show
        Julien Le Dem added a comment - I thought that the field UIDs were used to track lineage across the plan. Aniket Mokashi correct me if I'm wrong but it is used to determine which fields are reads for projection push down. In the case of self join (directly or indirectly) we end up with duplicate ids in the same relation because the same field is derived to 2 different fields. Otherwise I'm as lost as Koji Noguchi regarding the actual mechanisms around the UID. I tried to fix some of these in the past ( PIG-3020 ) but it appears they created more problems ( PIG-3492 ) Daniel Dai maybe you can enlighten us?
        Hide
        Dmitriy V. Ryaboy added a comment -

        Yikes.

        Aniket Mokashi & Julien Le Dem this seems like a critical bug to look at. Julien, you investigated this UID situation before, right?

        Show
        Dmitriy V. Ryaboy added a comment - Yikes. Aniket Mokashi & Julien Le Dem this seems like a critical bug to look at. Julien, you investigated this UID situation before, right?
        Hide
        Koji Noguchi added a comment -

        I wish there was a wiki/document describing how UID should be assigned. Even after going through PIG-3492, I'm still lost on when exactly we should assign new UIDs.
        My current understanding(or guess) is, UID represents an uniqueness within a record.
        Just by looking at the UIDs from the two separate relations(bags), we can tell if the fields were altered or not. (Although we cannot tell if the tuples were filtered or not.)

        FilterAboveForeach(PushUpFilter) is using this property to determine if FILTER can be moved before the foreach. Bug here is, nested distinct is not assigning a new UID for the bag it creates so FilterAboveForeach mistakenly thinks that no fields were altered within the foreach and decides to move this filter upfront.
        Following show the schema BEFORE calling PushUpFilter/FilterAboveForeach without and with Daniel's patch. We can see that after applying the patch, relation 'c' and 'a_group' contain different UIDs for the bag.

        (without the patch)
        |---c: (Name: LOFilter Schema: group#11:bytearray,a_distinct#12:bag{#13:tuple(#14:bytearray)})
            |---b: (Name: LOForEach Schema: group#11:bytearray,a_distinct#12:bag{#13:tuple(#14:bytearray)})
                |---a_group: (Name: LOCogroup Schema: group#11:bytearray,a#12:bag{#13:tuple()})
        (with the patch)
        |---c: (Name: LOFilter Schema: group#15:bytearray,a_distinct#20:bag{#19:tuple(#18:bytearray)})
            |---b: (Name: LOForEach Schema: group#15:bytearray,a_distinct#20:bag{#19:tuple(#18:bytearray)})
                |---a_group: (Name: LOCogroup Schema: group#15:bytearray,a#16:bag{#17:tuple()})
        

        So I think the patch fixes the bug described on the jira nicely. However, question remains for other nested operations. I believe the same bug can appear for nested LIMIT and nested FILTER. For example,

        a = load 'test.txt';    
        a_group = group a by $0;
        b = foreach a_group {
          a_limit = limit a.$0 5;
          generate group, a_limit;
        }
        c = filter b by SIZE(a_limit) == 5;
        store c into 'out';
        
        a = load 'test3.txt' as (a0, a1);     
        a_group = group a by a0;
        b = foreach a_group {
          newA = filter a by a1 == 2;
          generate group, newA;
        }
        c = filter b by SIZE(newA) == 5;
        store c into 'out';
        

        I confirmed these two examples also mistakenly push the filter before foreach and produce empty results. Former case, nested LIMIT, is actually covered with the current patch since nested LIMIT uses LOLIMIT+LOForeach. So the patch

        +  98      // If it is nested foreach or nested distinct, generate new uid
        +  99      if (op instanceof LOForEach || op instanceof LODistinct) {
        + 100         needNewUid = true;
        + 101     }
        

        takes care of nested limit although comment doesn't mention it. Nested filter is not the case here and the bug still exists after the current patch. Can we cover this case as well?

        Show
        Koji Noguchi added a comment - I wish there was a wiki/document describing how UID should be assigned. Even after going through PIG-3492 , I'm still lost on when exactly we should assign new UIDs. My current understanding(or guess) is, UID represents an uniqueness within a record. Just by looking at the UIDs from the two separate relations(bags), we can tell if the fields were altered or not. (Although we cannot tell if the tuples were filtered or not.) FilterAboveForeach(PushUpFilter) is using this property to determine if FILTER can be moved before the foreach. Bug here is, nested distinct is not assigning a new UID for the bag it creates so FilterAboveForeach mistakenly thinks that no fields were altered within the foreach and decides to move this filter upfront. Following show the schema BEFORE calling PushUpFilter/FilterAboveForeach without and with Daniel's patch. We can see that after applying the patch, relation 'c' and 'a_group' contain different UIDs for the bag. (without the patch) |---c: (Name: LOFilter Schema: group#11:bytearray,a_distinct#12:bag{#13:tuple(#14:bytearray)}) |---b: (Name: LOForEach Schema: group#11:bytearray,a_distinct#12:bag{#13:tuple(#14:bytearray)}) |---a_group: (Name: LOCogroup Schema: group#11:bytearray,a#12:bag{#13:tuple()}) (with the patch) |---c: (Name: LOFilter Schema: group#15:bytearray,a_distinct#20:bag{#19:tuple(#18:bytearray)}) |---b: (Name: LOForEach Schema: group#15:bytearray,a_distinct#20:bag{#19:tuple(#18:bytearray)}) |---a_group: (Name: LOCogroup Schema: group#15:bytearray,a#16:bag{#17:tuple()}) So I think the patch fixes the bug described on the jira nicely. However, question remains for other nested operations. I believe the same bug can appear for nested LIMIT and nested FILTER. For example, a = load 'test.txt'; a_group = group a by $0; b = foreach a_group { a_limit = limit a.$0 5; generate group, a_limit; } c = filter b by SIZE(a_limit) == 5; store c into 'out'; a = load 'test3.txt' as (a0, a1); a_group = group a by a0; b = foreach a_group { newA = filter a by a1 == 2; generate group, newA; } c = filter b by SIZE(newA) == 5; store c into 'out'; I confirmed these two examples also mistakenly push the filter before foreach and produce empty results. Former case, nested LIMIT, is actually covered with the current patch since nested LIMIT uses LOLIMIT+LOForeach. So the patch + 98 // If it is nested foreach or nested distinct, generate new uid + 99 if (op instanceof LOForEach || op instanceof LODistinct) { + 100 needNewUid = true; + 101 } takes care of nested limit although comment doesn't mention it. Nested filter is not the case here and the bug still exists after the current patch. Can we cover this case as well?
        Hide
        Koji Noguchi added a comment -

        This is a bad bug producing incorrect outputs. Taking out "in local mode" from summary, since this happens on mapreduce mode too.

        Show
        Koji Noguchi added a comment - This is a bad bug producing incorrect outputs. Taking out "in local mode" from summary, since this happens on mapreduce mode too.
        Hide
        Daniel Dai added a comment -

        That's the incorrect PushUpFilter. Can be solved by disable PushUpFilter rule:
        pig -t PushUpFilter -x local xxx.pig

        Look at the logical plan:

        c: (Name: LOStore Schema: group#28:bytearray,a_distinct#29:bag{#30:tuple(#31:bytearray)})
        |
        |---b: (Name: LOForEach Schema: group#28:bytearray,a_distinct#29:bag{#30:tuple(#31:bytearray)})
            |   |
            |   (Name: LOGenerate[false,false] Schema: group#28:bytearray,a_distinct#29:bag{#30:tuple(#31:bytearray)})ColumnPrune:InputUids=[29, 28]ColumnPrune:OutputUids=[29, 28]
            |   |   |
            |   |   group:(Name: Project Type: bytearray Uid: 28 Input: 0 Column: (*))
            |   |   |
            |   |   a_distinct:(Name: Project Type: bag Uid: 29 Input: 1 Column: (*))
            |   |
            |   |---(Name: LOInnerLoad[0] Schema: group#28:bytearray)
            |   |
            |   |---a_distinct: (Name: LODistinct Schema: #31:bytearray)
            |       |
            |       |---1-7: (Name: LOForEach Schema: #31:bytearray)
            |           |   |
            |           |   (Name: LOGenerate[false] Schema: #31:bytearray)
            |           |   |   |
            |           |   |   (Name: Project Type: bytearray Uid: 31 Input: 0 Column: (*))
            |           |   |
            |           |   |---(Name: LOInnerLoad[0] Schema: #31:bytearray)
            |           |
            |           |---a: (Name: LOInnerLoad[1] Schema: null)
            |
            |---c: (Name: LOFilter Schema: group#28:bytearray,a#29:bag{#36:tuple()})
                |   |
                |   (Name: Equal Type: boolean Uid: 35)
                |   |
                |   |---(Name: UserFunc(org.apache.pig.builtin.BagSize) Type: long Uid: 32)
                |   |   |
                |   |   |---a:(Name: Project Type: bag Uid: 29 Input: 0 Column: 1)
                |   |
                |   |---(Name: Cast Type: long Uid: 33)
                |       |
                |       |---(Name: Constant Type: int Uid: 33)
                |
                |---a_group: (Name: LOCogroup Schema: group#28:bytearray,a#29:bag{#36:tuple()})
                    |   |
                    |   (Name: Project Type: bytearray Uid: 28 Input: 0 Column: 0)
                    |
                    |---a: (Name: LOLoad Schema: null)RequiredFields:null
        

        Filter is pushed in front of foreach, which is wrong.

        Show
        Daniel Dai added a comment - That's the incorrect PushUpFilter. Can be solved by disable PushUpFilter rule: pig -t PushUpFilter -x local xxx.pig Look at the logical plan: c: (Name: LOStore Schema: group#28:bytearray,a_distinct#29:bag{#30:tuple(#31:bytearray)}) | |---b: (Name: LOForEach Schema: group#28:bytearray,a_distinct#29:bag{#30:tuple(#31:bytearray)}) | | | (Name: LOGenerate[ false , false ] Schema: group#28:bytearray,a_distinct#29:bag{#30:tuple(#31:bytearray)})ColumnPrune:InputUids=[29, 28]ColumnPrune:OutputUids=[29, 28] | | | | | group:(Name: Project Type: bytearray Uid: 28 Input: 0 Column: (*)) | | | | | a_distinct:(Name: Project Type: bag Uid: 29 Input: 1 Column: (*)) | | | |---(Name: LOInnerLoad[0] Schema: group#28:bytearray) | | | |---a_distinct: (Name: LODistinct Schema: #31:bytearray) | | | |---1-7: (Name: LOForEach Schema: #31:bytearray) | | | | | (Name: LOGenerate[ false ] Schema: #31:bytearray) | | | | | | | (Name: Project Type: bytearray Uid: 31 Input: 0 Column: (*)) | | | | | |---(Name: LOInnerLoad[0] Schema: #31:bytearray) | | | |---a: (Name: LOInnerLoad[1] Schema: null ) | |---c: (Name: LOFilter Schema: group#28:bytearray,a#29:bag{#36:tuple()}) | | | (Name: Equal Type: boolean Uid: 35) | | | |---(Name: UserFunc(org.apache.pig.builtin.BagSize) Type: long Uid: 32) | | | | | |---a:(Name: Project Type: bag Uid: 29 Input: 0 Column: 1) | | | |---(Name: Cast Type: long Uid: 33) | | | |---(Name: Constant Type: int Uid: 33) | |---a_group: (Name: LOCogroup Schema: group#28:bytearray,a#29:bag{#36:tuple()}) | | | (Name: Project Type: bytearray Uid: 28 Input: 0 Column: 0) | |---a: (Name: LOLoad Schema: null )RequiredFields: null Filter is pushed in front of foreach, which is wrong.

          People

          • Assignee:
            Daniel Dai
            Reporter:
            Sergey
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development