Pig
  1. Pig
  2. PIG-1846

optimize queries like - count distinct users for each gender

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.9.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      The pig group operation does not usually have to deal with skew on the group-by keys if the foreach statement that works on the results of group has only algebraic functions on the bags. But for some queries like the following, skew can be a problem -

      user_data = load 'file' as (user, gender, age);
      user_group_gender = group user_data by gender parallel 100;
      dist_users_per_gender = foreach user_group_gender 
                              { 
                                   dist_user = distinct user_data.user; 
                                   generate group as gender, COUNT(dist_user) as user_count;
                              }
      

      Since there are only 2 distinct values of the group-by key, only 2 reducers will actually get used in current implementation. ie, you can't get better performance by adding more reducers.
      Similar problem is there when the data is skewed on the group key. With current implementation, another problem is that pig and MR has to deal with records with extremely large bags that have the large number of distinct user names, which results in high memory utilization and having to spill the bags to disk.

      The query plan should be modified to handle the skew in such cases and make use of more reducers.

        Activity

        Thejas M Nair created issue -
        Hide
        Thejas M Nair added a comment -

        One way to mitigate the problem of skew in above above example query is to add another group-by statement which uses both gender and user as group-by key, and does a partial aggregation. It will introduce and additional MR job. The 2nd MR job will be effectively using only 2 reducers, but the work that needs to be done in the reduce of the 2nd MR job will be very little.

        USER_DATA = load 'file' as (USER, GENDER, AGE);
        USER_GROUP_GENDER_PART = group USER_DATA by (GENDER, USER) parallel 100;
        
        -- there is only one distinct user per row since the USER column is one of group-by colums, so just project 1 as count
        DIST_USER_PER_GENDER_PART = foreach USER_GROUP_GENDER_PART generate group.GENDER as GENDER, 1 as USER_COUNT; 
        USER_GROUP_GENDER = group DIST_USER_PER_GENDER_PART by  GENDER;
        
        -- map-side combiner will do most of the work in parallel, reduce will need to process few small records
        DIST_USER_PER_GENDER = foreach USER_GROUP_GENDER generate GENDER, SUM(USER_GROUP_GENDER.USER_COUNT); 
        
        Show
        Thejas M Nair added a comment - One way to mitigate the problem of skew in above above example query is to add another group-by statement which uses both gender and user as group-by key, and does a partial aggregation. It will introduce and additional MR job. The 2nd MR job will be effectively using only 2 reducers, but the work that needs to be done in the reduce of the 2nd MR job will be very little. USER_DATA = load 'file' as (USER, GENDER, AGE); USER_GROUP_GENDER_PART = group USER_DATA by (GENDER, USER) parallel 100; -- there is only one distinct user per row since the USER column is one of group-by colums, so just project 1 as count DIST_USER_PER_GENDER_PART = foreach USER_GROUP_GENDER_PART generate group.GENDER as GENDER, 1 as USER_COUNT; USER_GROUP_GENDER = group DIST_USER_PER_GENDER_PART by GENDER; -- map-side combiner will do most of the work in parallel, reduce will need to process few small records DIST_USER_PER_GENDER = foreach USER_GROUP_GENDER generate GENDER, SUM(USER_GROUP_GENDER.USER_COUNT);
        Olga Natkovich made changes -
        Field Original Value New Value
        Fix Version/s 0.10 [ 12316246 ]
        Hide
        Thejas M Nair added a comment -

        For the general case, where there is skew on the group-by keys, or the cardinality of the group-by keys is very low compared to desired parallelism. The usual way of processing it -

        gby = GROUP in BY (c1, c2) PARALLEL 100;
        res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.c3);
        

        can be converted to -

        dist_f = FOREACH in GENERATE c1, c2, c3;
        dist = DISTINCT dist_f PARALLEL 100;
        dist_grp = GROUP dist by c1, c2;
        res = FOREACH dist generate c1, c2, FUNC(c3); -- no distinct on c3 required here 
        
        Show
        Thejas M Nair added a comment - For the general case, where there is skew on the group-by keys, or the cardinality of the group-by keys is very low compared to desired parallelism. The usual way of processing it - gby = GROUP in BY (c1, c2) PARALLEL 100; res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.c3); can be converted to - dist_f = FOREACH in GENERATE c1, c2, c3; dist = DISTINCT dist_f PARALLEL 100; dist_grp = GROUP dist by c1, c2; res = FOREACH dist generate c1, c2, FUNC(c3); -- no distinct on c3 required here
        Hide
        Dmitriy V. Ryaboy added a comment -

        The DISTINCT optimization is often not applicable; consider, for example, a script that takes all pages on a website and generates COUNT(impressions), COUNT(distinct users). Doing the distinct operation first means we can no longer do COUNT(impressions).

        An algebraic function applied to non-distinct bags can be decomposed in this case as follows:

        gby = GROUP in BY (c1, c2) PARALLEL 100;
        res = FOREACH gby GENERATE FLATTEN(group) as (c1, c2), FUNC(distinct in.c3), ALGFUNC(in.c4);
        

        becomes

        in = FOREACH in GENERATE *, ALGFUNC$Initial(c4) as init;
        gby_dist = GROUP in BY (c1, c2, c3) PARALLEL 100;
        res_dist = FOREACH gby_dist GENERATE 
          FLATTEN(group) as (c1, c2, c3),
          ALGFUNC$Intermed(in.init) as intermed;
        gby = GROUP res_dist BY (c1, c2) PARALLEL 100;
        res = FOREACH gby GENERATE
          FLATTEN(group) as (c1, c2),
          FUNC(res_dist.c3),
          ALGFUNC$Final(res_dist.intermed);
        
        Show
        Dmitriy V. Ryaboy added a comment - The DISTINCT optimization is often not applicable; consider, for example, a script that takes all pages on a website and generates COUNT(impressions), COUNT(distinct users). Doing the distinct operation first means we can no longer do COUNT(impressions). An algebraic function applied to non-distinct bags can be decomposed in this case as follows: gby = GROUP in BY (c1, c2) PARALLEL 100; res = FOREACH gby GENERATE FLATTEN(group) as (c1, c2), FUNC(distinct in.c3), ALGFUNC(in.c4); becomes in = FOREACH in GENERATE *, ALGFUNC$Initial(c4) as init; gby_dist = GROUP in BY (c1, c2, c3) PARALLEL 100; res_dist = FOREACH gby_dist GENERATE FLATTEN(group) as (c1, c2, c3), ALGFUNC$Intermed(in.init) as intermed; gby = GROUP res_dist BY (c1, c2) PARALLEL 100; res = FOREACH gby GENERATE FLATTEN(group) as (c1, c2), FUNC(res_dist.c3), ALGFUNC$Final(res_dist.intermed);
        Hide
        Thejas M Nair added a comment -

        The DISTINCT optimization is often not applicable; consider, for example, a script that takes all pages on a website and generates COUNT(impressions), COUNT(distinct users). Doing the distinct operation first means we can no longer do COUNT(impressions).

        Yes, that optimization will not be applicable for this use case.

        The translation you proposed helps to distribute the work of computing ALGFUNC(in.c4) across multiple tasks (even when there is skew on c1,c2). But FUNC(res_dist.c3) will still get computed in reduce side (ie, all records for a value of c1,c2 will go to one reduce), as combiner will not get used. This is because ALGFUNC$Final is not algebraic.

        One cumbersome workaround for user is to write a new udf ALGFUNC_2 which is same as ALGFUNC, except for having ALGFUNC_2$Initial same as ALGFUNC$Intermed . This ALGFUNC_2 then gets used in the last foreach .
        Pig can automate this logic, and use combiner for the last foreach in above examples translation.

        Show
        Thejas M Nair added a comment - The DISTINCT optimization is often not applicable; consider, for example, a script that takes all pages on a website and generates COUNT(impressions), COUNT(distinct users). Doing the distinct operation first means we can no longer do COUNT(impressions). Yes, that optimization will not be applicable for this use case. The translation you proposed helps to distribute the work of computing ALGFUNC(in.c4) across multiple tasks (even when there is skew on c1,c2). But FUNC(res_dist.c3) will still get computed in reduce side (ie, all records for a value of c1,c2 will go to one reduce), as combiner will not get used. This is because ALGFUNC$Final is not algebraic. One cumbersome workaround for user is to write a new udf ALGFUNC_2 which is same as ALGFUNC, except for having ALGFUNC_2$Initial same as ALGFUNC$Intermed . This ALGFUNC_2 then gets used in the last foreach . Pig can automate this logic, and use combiner for the last foreach in above examples translation.
        Hide
        Thejas M Nair added a comment -

        The optimizations proposed above is applicable for only cases where the distinct happens on only one column or a single set of columns .

        for example, it is applicable for-
        res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.c3);
        res = FOREACH gby GENERATE group.c1, group.c2, FUNC1(distinct in.c3), FUNC2(distinct in.c3); – distinct on same column used in two functions
        res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.(c3,c4)); – distinct on multiple columns
        res = FOREACH gby GENERATE group.c1, group.c2, FUNC1(distinct in.(c3,c4)), FUNC2(distinct in.(c3,c4)); – distinct on same set of multiple columns, used in two functions

        It is not applicable for -
        res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.c3), FUNC(distinct in.c4); – the two udfs have distinct on two different udfs.

        FYI, the examples here also using unsupported syntax -
        res = FOREACH gby GENERATE group.c1, group.c2, FUNC(DISTINCT in.c3);
        should actually be -
        res = FOREACH gby

        { dist_c3 = DISTINCT in.c3; GENERATE group.c1, group.c2, FUNC(dist_c3);}
        Show
        Thejas M Nair added a comment - The optimizations proposed above is applicable for only cases where the distinct happens on only one column or a single set of columns . for example, it is applicable for- res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.c3); res = FOREACH gby GENERATE group.c1, group.c2, FUNC1(distinct in.c3), FUNC2(distinct in.c3); – distinct on same column used in two functions res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.(c3,c4)); – distinct on multiple columns res = FOREACH gby GENERATE group.c1, group.c2, FUNC1(distinct in.(c3,c4)), FUNC2(distinct in.(c3,c4)); – distinct on same set of multiple columns, used in two functions It is not applicable for - res = FOREACH gby GENERATE group.c1, group.c2, FUNC(distinct in.c3), FUNC(distinct in.c4); – the two udfs have distinct on two different udfs. FYI, the examples here also using unsupported syntax - res = FOREACH gby GENERATE group.c1, group.c2, FUNC(DISTINCT in.c3); should actually be - res = FOREACH gby { dist_c3 = DISTINCT in.c3; GENERATE group.c1, group.c2, FUNC(dist_c3);}
        Hide
        Dmitriy V. Ryaboy added a comment -

        yeah I was just using short-hand with the distinct thing, and assumed you would know what I meant

        Is there a reason not to apply algebraic functions in an algebraic fashion when non-algebraic functions are also used in GENERATE? I think there was even a ticket to make this happen.

        In practice I often manually apply this optimization by rewriting COUNT(distinct bar.foo), COUNT(bar) by turning the second COUNT into a sum of counts – which is essentially manually doing the cumbersome workaround.. I wonder if there is a clean way to define / use these kinds of algebraic relationships.

        Regarding two distincts – we can run the initial group-bys twice, and join?

        Show
        Dmitriy V. Ryaboy added a comment - yeah I was just using short-hand with the distinct thing, and assumed you would know what I meant Is there a reason not to apply algebraic functions in an algebraic fashion when non-algebraic functions are also used in GENERATE? I think there was even a ticket to make this happen. In practice I often manually apply this optimization by rewriting COUNT(distinct bar.foo), COUNT(bar) by turning the second COUNT into a sum of counts – which is essentially manually doing the cumbersome workaround.. I wonder if there is a clean way to define / use these kinds of algebraic relationships. Regarding two distincts – we can run the initial group-bys twice, and join?
        Hide
        Alan Gates added a comment -

        Is there a reason not to apply algebraic functions in an algebraic fashion when non-algebraic functions are also used in GENERATE? I think there was even a ticket to make this happen.

        When we tried this in the past the performance was very bad, because you end up running all the data through the combiner (which is costly do the (de)serialization cycles) with no resulting reduction.

        Show
        Alan Gates added a comment - Is there a reason not to apply algebraic functions in an algebraic fashion when non-algebraic functions are also used in GENERATE? I think there was even a ticket to make this happen. When we tried this in the past the performance was very bad, because you end up running all the data through the combiner (which is costly do the (de)serialization cycles) with no resulting reduction.
        Hide
        Thejas M Nair added a comment -

        yeah I was just using short-hand with the distinct thing, and assumed you would know what I meant

        I didn't realize the mistake when I wrote the example. But short hand is more readable, i have created a PIG-2117 to discuss supporting that syntax.

        Regarding two distincts – we can run the initial group-bys twice, and join?

        Yes, that will work.

        If the udf FUNC is algebraic and FUNC.Initial() returns something that is smaller than its argument (eg, COUNT), a further optimization would be -

        in = FOREACH in GENERATE *, ALGFUNC$Initial(c4) as init;
        gby_dist = GROUP in BY (c1, c2, c3) PARALLEL 100;
        res_dist = FOREACH gby_dist GENERATE 
          group.c1, group.c2, FUNC.Initial(c3),
          ALGFUNC$Intermed(in.init) as intermed;
        gby = GROUP res_dist BY (c1, c2) PARALLEL 100;
        res = FOREACH gby GENERATE
          FLATTEN(group) as (c1, c2),
          FUNC2(res_dist.c3),
          ALGFUNC2(res_dist.intermed);
        

        Where FUNC2 is like ALGFUNC2 described earlier, having FUNC2.Initial same as FUNC.Intermed .

        Show
        Thejas M Nair added a comment - yeah I was just using short-hand with the distinct thing, and assumed you would know what I meant I didn't realize the mistake when I wrote the example. But short hand is more readable, i have created a PIG-2117 to discuss supporting that syntax. Regarding two distincts – we can run the initial group-bys twice, and join? Yes, that will work. If the udf FUNC is algebraic and FUNC.Initial() returns something that is smaller than its argument (eg, COUNT), a further optimization would be - in = FOREACH in GENERATE *, ALGFUNC$Initial(c4) as init; gby_dist = GROUP in BY (c1, c2, c3) PARALLEL 100; res_dist = FOREACH gby_dist GENERATE group.c1, group.c2, FUNC.Initial(c3), ALGFUNC$Intermed(in.init) as intermed; gby = GROUP res_dist BY (c1, c2) PARALLEL 100; res = FOREACH gby GENERATE FLATTEN(group) as (c1, c2), FUNC2(res_dist.c3), ALGFUNC2(res_dist.intermed); Where FUNC2 is like ALGFUNC2 described earlier, having FUNC2.Initial same as FUNC.Intermed .
        Hide
        Dmitriy V. Ryaboy added a comment -

        This is a subject for a different ticket, but to address Alan's comment: have we considered in-memory combiners as in Lin & Schatz: http://portal.acm.org/citation.cfm?id=1830263 ?

        Show
        Dmitriy V. Ryaboy added a comment - This is a subject for a different ticket, but to address Alan's comment: have we considered in-memory combiners as in Lin & Schatz: http://portal.acm.org/citation.cfm?id=1830263 ?
        Olga Natkovich made changes -
        Fix Version/s 0.10 [ 12316246 ]

          People

          • Assignee:
            Unassigned
            Reporter:
            Thejas M Nair
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development