Pig
  1. Pig
  2. PIG-2829

Use partial aggregation more aggresively

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.10.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Partial aggregation (Hash Aggregation, aka in-map combiner) is a new feature in Pig 0.10 that will perform aggregation within map function. The main advantage against combiner is it avoids de/serializing and sorting the data, and it can auto disable itself if the data reduction rate is low. Currently it's disabled by default.

      To leverage the power of PartialAgg more aggressively, several things need to be revisited:

      1. The threshold of auto-disabling. Currently each mapper looks at first 1k (hard-coded) records to see if there's enough data size reduction (defaults to 10x, configurable). The check would happen earlier if the hash table gets full before processing the 1k records (hash table size is controlled by pig.cachedbag.memusage). We might want to relax these thresholds.

      2. Dependency on the combiner. Currently the PartialAgg won't work without a combiner following it, so we need to provide separate options to enable each independently.

      1. 2829.1.patch
        11 kB
        Jie Li
      2. 2829.2.patch
        20 kB
        Jie Li
      3. 2829.separate.options.patch
        4 kB
        Jie Li
      4. pigmix-10G.png
        114 kB
        Jie Li
      5. tpch-10G.png
        100 kB
        Jie Li

        Issue Links

          Activity

          Hide
          Jie Li added a comment -

          No problem Dmitriy. I'll see if I can find some time over the weekend.

          Show
          Jie Li added a comment - No problem Dmitriy. I'll see if I can find some time over the weekend.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Jie, sorry I missed this ticket before. As you may have seen, I completely reimplemented this whole chunk of code in PIG-2888. Can you rerun your benchmarks and see if some of the improvements you propose here should be applied to the code developed in that ticket?

          Show
          Dmitriy V. Ryaboy added a comment - Jie, sorry I missed this ticket before. As you may have seen, I completely reimplemented this whole chunk of code in PIG-2888 . Can you rerun your benchmarks and see if some of the improvements you propose here should be applied to the code developed in that ticket?
          Hide
          Jie Li added a comment -

          Generated 100G tpc-h data with reduction rate 2 and 3 respectively. For each dataset, ran two queries: group-by with 8 aggregations and group-by with 1 aggregation:

          query combiner off, partial-agg off combiner off, partial-agg on
          g-by with reduction by 3 and 8 aggregations 47m59s 47m46s
          g-by with reduction by 2 and 8 aggregations 48m39s 57m3s
          g-by with reduction by 3 and 1 aggregations 23m37s 20m52s
          g-by with reduction by 2 and 1 aggregations 24m11s 24m36s

          From the result we can see the minimum reduction rate for partial-agg is not trivial to decide: it depends on the cost of performing the reduction (number of aggregations, cost of aggregations, etc), and the cost to transfer the data ( the amount of data to transfer, and the network traffic, etc). It's like compression: the performance is a trade-off between cpu and io, and is application-dependent. For the default value, 3 will give more significant improvement while 2 will save more traffic data. Any comment?

          Show
          Jie Li added a comment - Generated 100G tpc-h data with reduction rate 2 and 3 respectively. For each dataset, ran two queries: group-by with 8 aggregations and group-by with 1 aggregation: query combiner off, partial-agg off combiner off, partial-agg on g-by with reduction by 3 and 8 aggregations 47m59s 47m46s g-by with reduction by 2 and 8 aggregations 48m39s 57m3s g-by with reduction by 3 and 1 aggregations 23m37s 20m52s g-by with reduction by 2 and 1 aggregations 24m11s 24m36s From the result we can see the minimum reduction rate for partial-agg is not trivial to decide: it depends on the cost of performing the reduction (number of aggregations, cost of aggregations, etc), and the cost to transfer the data ( the amount of data to transfer, and the network traffic, etc). It's like compression: the performance is a trade-off between cpu and io, and is application-dependent. For the default value, 3 will give more significant improvement while 2 will save more traffic data. Any comment?
          Hide
          Thejas M Nair added a comment -

          Thanks for the benchmark Jie. Clearly, partial-agg is working better than combiner.
          Can you also run some benchmarks with combiner turned off, so that we can verify the appropriate value for pig.exec.mapPartAgg.minReduction -

          query combiner off, partial-agg off combiner off, partial-agg on
          g-by with reduction by 3    
          g-by with reduction by 2    
          Show
          Thejas M Nair added a comment - Thanks for the benchmark Jie. Clearly, partial-agg is working better than combiner. Can you also run some benchmarks with combiner turned off, so that we can verify the appropriate value for pig.exec.mapPartAgg.minReduction - query combiner off, partial-agg off combiner off, partial-agg on g-by with reduction by 3     g-by with reduction by 2    
          Hide
          Jie Li added a comment -

          Updated the patch with unit test fixes and new unit tests verifying default configurations.

          Below is the benchmark results on 4-slave cluster with 100GB TPC-H data. Query 1 and some synthetic queries are used. Each query uses 300 map tasks and 79 reduce tasks, and each map task is processing 2 million records:

          query trunk patch comment
          TPCH Q1 58 min 34 min Q1's group-by has four different keys and eight aggregations.
          S-600x 35 min 30 min The reduction rate of output/input records is 600.
          S-4x 31 min 21 min The reduction rate of output/input records is 4.
          S-1x 59 min 44 min The reduction rate of output/input records is 1. Every group-by key is different.
          S-high memory map task 5min ~ 6 min map task 2min ~ 3min reduction rate is 1 (no reduction). 16 aggregations in the same group.

          We can see the performance of new default settings in this patch is always better than the old default settings in the trunk.

          Also tested the latency of disabling MapAgg using the query S-1x (no reduction). There's almost no difference:

          pig.exec.mapPartAgg.reduction.checkinterval job running time
          1000 43 min 54 sec
          100000 43 min 46 sec
          Show
          Jie Li added a comment - Updated the patch with unit test fixes and new unit tests verifying default configurations. Below is the benchmark results on 4-slave cluster with 100GB TPC-H data. Query 1 and some synthetic queries are used. Each query uses 300 map tasks and 79 reduce tasks, and each map task is processing 2 million records: query trunk patch comment TPCH Q1 58 min 34 min Q1's group-by has four different keys and eight aggregations. S-600x 35 min 30 min The reduction rate of output/input records is 600. S-4x 31 min 21 min The reduction rate of output/input records is 4. S-1x 59 min 44 min The reduction rate of output/input records is 1. Every group-by key is different. S-high memory map task 5min ~ 6 min map task 2min ~ 3min reduction rate is 1 (no reduction). 16 aggregations in the same group. We can see the performance of new default settings in this patch is always better than the old default settings in the trunk. Also tested the latency of disabling MapAgg using the query S-1x (no reduction). There's almost no difference: pig.exec.mapPartAgg.reduction.checkinterval job running time 1000 43 min 54 sec 100000 43 min 46 sec
          Hide
          Jie Li added a comment -

          Can you try these settings queries where there are around 10+ group+agg that get combined into single MR job ?

          Sure the tpc-h Q1 I ran before has 8 aggregations. I'll further double the number of aggregations and also change the group-by key so that every hash map will get full, so we can identify if there's any memory issue.

          Can you do some benchmarks to see if there is any noticeable difference in runtime because of the delay in turning mapPartAgg off ?

          Yeah will compare these two settings.

          Show
          Jie Li added a comment - Can you try these settings queries where there are around 10+ group+agg that get combined into single MR job ? Sure the tpc-h Q1 I ran before has 8 aggregations. I'll further double the number of aggregations and also change the group-by key so that every hash map will get full, so we can identify if there's any memory issue. Can you do some benchmarks to see if there is any noticeable difference in runtime because of the delay in turning mapPartAgg off ? Yeah will compare these two settings.
          Hide
          Thejas M Nair added a comment -

          I will review the patch soon. Some comments regarding the default configuration -

          2: changes existing default values:

          After thinking of the multi-query use case, where you can have multiple POPartialAgg operators in a map task, I am having second thoughts on turning partial agg on by default. Can you try these settings queries where there are around 10+ group+agg that get combined into single MR job ? Maybe we should address the potential OOM issues for this use case before we change the defaults. This is likely to be become a bigger issue when we use 100k records to decide to turn on/off the partial aggregation.

          3: adds a property pig.exec.mapPartAgg.reduction.checkinterval which defaults to 100k, so after processing every 100k records mapagg will check the reduction rate to see if it should be disabled. Previously we only look at first 1000 records.

          Can you do some benchmarks to see if there is any noticeable difference in runtime because of the delay in turning mapPartAgg off ?

          Show
          Thejas M Nair added a comment - I will review the patch soon. Some comments regarding the default configuration - 2: changes existing default values: After thinking of the multi-query use case, where you can have multiple POPartialAgg operators in a map task, I am having second thoughts on turning partial agg on by default. Can you try these settings queries where there are around 10+ group+agg that get combined into single MR job ? Maybe we should address the potential OOM issues for this use case before we change the defaults. This is likely to be become a bigger issue when we use 100k records to decide to turn on/off the partial aggregation. 3: adds a property pig.exec.mapPartAgg.reduction.checkinterval which defaults to 100k, so after processing every 100k records mapagg will check the reduction rate to see if it should be disabled. Previously we only look at first 1000 records. Can you do some benchmarks to see if there is any noticeable difference in runtime because of the delay in turning mapPartAgg off ?
          Hide
          Jie Li added a comment -

          Attached patch with all the rework (mostly learnt from hive):

          1: separates options to enable combiner and mapagg

          2: changes existing default values:

          property old default value new default value comment
          pig.exec.nocombiner false true disable combiner by default
          pig.exec.mapPartAgg false true enable mapagg by default
          pig.exec.mapPartAgg.minReduction 10 2.0 more aggressive. also change from int to double

          3: adds a property pig.exec.mapPartAgg.reduction.checkinterval which defaults to 100k, so after processing every 100k records mapagg will check the reduction rate to see if it should be disabled. Previously we only look at first 1000 records.

          4: previously the reduction check would also happen if the hash map gets full. The patch removes this condition and instead it keeps track of the total new hash map entries, so the reduction check will only be triggered by pig.exec.mapPartAgg.reduction.checkinterval, which is easier to control.

          Welcome to give any comment! Will work on fixing unit tests and performance testing.

          Show
          Jie Li added a comment - Attached patch with all the rework (mostly learnt from hive): 1: separates options to enable combiner and mapagg 2: changes existing default values: property old default value new default value comment pig.exec.nocombiner false true disable combiner by default pig.exec.mapPartAgg false true enable mapagg by default pig.exec.mapPartAgg.minReduction 10 2.0 more aggressive. also change from int to double 3: adds a property pig.exec.mapPartAgg.reduction.checkinterval which defaults to 100k, so after processing every 100k records mapagg will check the reduction rate to see if it should be disabled. Previously we only look at first 1000 records. 4: previously the reduction check would also happen if the hash map gets full. The patch removes this condition and instead it keeps track of the total new hash map entries, so the reduction check will only be triggered by pig.exec.mapPartAgg.reduction.checkinterval, which is easier to control. Welcome to give any comment! Will work on fixing unit tests and performance testing.
          Hide
          Jie Li added a comment -

          Attached an initial patch that separates options for enabling combiner and mapagg. Now both combiner and mapagg will trigger the CombinerOptimization, and the combiner plan will be removed if the combiner is not enabled.

          Show
          Jie Li added a comment - Attached an initial patch that separates options for enabling combiner and mapagg. Now both combiner and mapagg will trigger the CombinerOptimization, and the combiner plan will be removed if the combiner is not enabled.
          Hide
          Jie Li added a comment -

          Attached some benchmark results of a group-by aggregation on different dataset (TPC-H and Pigmix) with different selectivity and with combiner/PartialAgg turned on/off respectively.

          'none' means not using combiner or PartialAgg. 'combiner' means only using the combiner. 'hash' means only using the PartialAgg (with some hack). 'hash+combiner' means enabling PartialAgg. For the latter two we configures the minimum reduction to 1 so PartialAgg is never auto-disabled (otherwise it'd be auto-disabled in all cases currently). For TPC-H We also run Hive with default settings for reference.

          The titles above each chart show the number of input records and output records of each query, for example, "60M -> 4 reduction" means there are 60 million input records and four output records. For both dataset we use 10GB data, and ran on a single machine, which is ok here as we are comparing PartialAgg with combiner so the network doesn't matter much here.

          From the results we can observe:

          1) PartialAgg is more efficient than the combiner, which is as expected and should be leveraged;
          2) the combiner is unnecessary when PartialAgg is used;
          3) the PartialAgg/combiner overhead can be significant if the data reduction rate is low.

          Show
          Jie Li added a comment - Attached some benchmark results of a group-by aggregation on different dataset (TPC-H and Pigmix) with different selectivity and with combiner/PartialAgg turned on/off respectively. 'none' means not using combiner or PartialAgg. 'combiner' means only using the combiner. 'hash' means only using the PartialAgg (with some hack). 'hash+combiner' means enabling PartialAgg. For the latter two we configures the minimum reduction to 1 so PartialAgg is never auto-disabled (otherwise it'd be auto-disabled in all cases currently). For TPC-H We also run Hive with default settings for reference. The titles above each chart show the number of input records and output records of each query, for example, "60M -> 4 reduction" means there are 60 million input records and four output records. For both dataset we use 10GB data, and ran on a single machine, which is ok here as we are comparing PartialAgg with combiner so the network doesn't matter much here. From the results we can observe: 1) PartialAgg is more efficient than the combiner, which is as expected and should be leveraged; 2) the combiner is unnecessary when PartialAgg is used; 3) the PartialAgg/combiner overhead can be significant if the data reduction rate is low.

            People

            • Assignee:
              Unassigned
              Reporter:
              Jie Li
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Development