Pig
  1. Pig
  2. PIG-2888

Improve performance of POPartialAgg

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.11
    • Component/s: None
    • Labels:
      None

      Description

      During performance testing, we found that POPartialAgg can cause performance degradation for Pig jobs when the Algebraic UDFs it's being applied to aren't well suited to the operator's assumptions. Changing the implementation to a more flexible hash-based model can provide significant performance improvements.

      1. partialagg_patch_1.patch
        29 kB
        Dmitriy V. Ryaboy
      2. partialagg_patch_2.patch
        39 kB
        Dmitriy V. Ryaboy
      3. partialagg_patch_3.patch
        40 kB
        Dmitriy V. Ryaboy
      4. partialagg_patch_4.patch
        42 kB
        Dmitriy V. Ryaboy
      5. partialagg_patch_5.patch
        43 kB
        Dmitriy V. Ryaboy
      6. partialagg_patch_6.patch
        44 kB
        Dmitriy V. Ryaboy
      7. PIG-2888.final.patch
        97 kB
        Dmitriy V. Ryaboy

        Activity

        Hide
        Dmitriy V. Ryaboy added a comment -

        The current implementation makes a two key assumptions that are frequently violated in real-life datasets and scripts:

        1) The intermediate UDF is cheap to invoke
        2) Records come in mostly-grouped order (records with the same key tend to follow each other).

        When condition 2 is not satisfied, POPartialAgg winds up calling the intermediate UDF on all accumulated values so far for a given key, plus a new tuple, for every single tuple it sees. This causes a significant performance degradation.

        Instead, we propose accumulating tuples across the board until a memory threshold is reached. Once this threshold is reached, all keys and tuples are fed into the intermediate UDF and the results put into a second-level map (presumably, having been significantly shrunk by the intermediate UDF). This repeats until the second-level map hits its threshold, at which point it is summarized and its values replaced with the aggregated ones. If after such a reduction the memory occupied by the hashmap is still near the threshold, the results are returned to the regular MR pipeline.

        Show
        Dmitriy V. Ryaboy added a comment - The current implementation makes a two key assumptions that are frequently violated in real-life datasets and scripts: 1) The intermediate UDF is cheap to invoke 2) Records come in mostly-grouped order (records with the same key tend to follow each other). When condition 2 is not satisfied, POPartialAgg winds up calling the intermediate UDF on all accumulated values so far for a given key, plus a new tuple, for every single tuple it sees. This causes a significant performance degradation. Instead, we propose accumulating tuples across the board until a memory threshold is reached. Once this threshold is reached, all keys and tuples are fed into the intermediate UDF and the results put into a second-level map (presumably, having been significantly shrunk by the intermediate UDF). This repeats until the second-level map hits its threshold, at which point it is summarized and its values replaced with the aggregated ones. If after such a reduction the memory occupied by the hashmap is still near the threshold, the results are returned to the regular MR pipeline.
        Hide
        Dmitriy V. Ryaboy added a comment -

        The attached patch is an initial pass at this implementation. Reading it as a diff may be hard – about 70% of the code in POPartialAgg changed – I recommend applying it to a git branch and looking at the class directly.

        I have not implemented memory-based triggering yet, for now just relying on hardcoded limits on number of tuples in the caches.

        I have also not implemented the functionality to automatically turn off hash-based aggregation.

        Tests (except the memory setting related tests) pass.

        Test runs on synthetic data both in local mode and on a cluster produced correct data.

        Cluster runs indicate significant improvement in overall speed of execution when using this approach.

        Show
        Dmitriy V. Ryaboy added a comment - The attached patch is an initial pass at this implementation. Reading it as a diff may be hard – about 70% of the code in POPartialAgg changed – I recommend applying it to a git branch and looking at the class directly. I have not implemented memory-based triggering yet, for now just relying on hardcoded limits on number of tuples in the caches. I have also not implemented the functionality to automatically turn off hash-based aggregation. Tests (except the memory setting related tests) pass. Test runs on synthetic data both in local mode and on a cluster produced correct data. Cluster runs indicate significant improvement in overall speed of execution when using this approach.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Attaching a second version. It's ready for review.

        This takes care of memory estimation (and actually looks at number of operators, doesn't just hardcode a magic "3"), and turns off if reduction is insufficient.

        Would love to get a 3-rd party verification of the speed improvements. Maybe someone who has recent PigMix results can rerun with this patch?

        One of the test cases (TestPOPartialAgg.testPartialMultiInput1HashMemEmpty) still fails, because it assumes that even if no memory is allocated to internal cached bags, consecutive keys still get aggregated. That's an assumption that's pretty specific to the old implementation. Does anyone think that feature is critical? If not, I would like to remove the test.

        Show
        Dmitriy V. Ryaboy added a comment - Attaching a second version. It's ready for review. This takes care of memory estimation (and actually looks at number of operators, doesn't just hardcode a magic "3"), and turns off if reduction is insufficient. Would love to get a 3-rd party verification of the speed improvements. Maybe someone who has recent PigMix results can rerun with this patch? One of the test cases (TestPOPartialAgg.testPartialMultiInput1HashMemEmpty) still fails, because it assumes that even if no memory is allocated to internal cached bags, consecutive keys still get aggregated. That's an assumption that's pretty specific to the old implementation. Does anyone think that feature is critical? If not, I would like to remove the test.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Minor logging and spill perf improvements (reusing the iterator, forcing an agg if any list gets too big, being slightly more clever about hashmap sizing).

        Show
        Dmitriy V. Ryaboy added a comment - Minor logging and spill perf improvements (reusing the iterator, forcing an agg if any list gets too big, being slightly more clever about hashmap sizing).
        Hide
        Dmitriy V. Ryaboy added a comment -

        none of the PigMix queries hit the particular bad behavior this is meant to address. I've verified that the speed is on par with the previous implementation for those "good" use cases.

        Here is a script for which Pig with this patch finishes in 57 seconds, while without the patch, it takes 13 mins 48 secs:

        rmf tmp/delme
        l = load 'data.txt';
        x = foreach l generate $0 as l, (int) (RANDOM() * 10000) as num; 
        g = foreach (group x by num % 100) { d = distinct x.num; generate SUM(d); }
        store g into 'tmp/delme';
        

        Data file contains about 7 million rows, 1 letter each.
        This is an intentionally skewed example, but we've encountered similar problems with real data, particularly when grouping by high-cardinality columns like user_id and subsequently performing algebraic operations on nested distincts.

        Show
        Dmitriy V. Ryaboy added a comment - none of the PigMix queries hit the particular bad behavior this is meant to address. I've verified that the speed is on par with the previous implementation for those "good" use cases. Here is a script for which Pig with this patch finishes in 57 seconds, while without the patch, it takes 13 mins 48 secs: rmf tmp/delme l = load 'data.txt'; x = foreach l generate $0 as l, ( int ) (RANDOM() * 10000) as num; g = foreach (group x by num % 100) { d = distinct x.num; generate SUM(d); } store g into 'tmp/delme'; Data file contains about 7 million rows, 1 letter each. This is an intentionally skewed example, but we've encountered similar problems with real data, particularly when grouping by high-cardinality columns like user_id and subsequently performing algebraic operations on nested distincts.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Significant improvements to transitions from raw to processed map. Better mem utilization estimation. Better logging.

        While profiling, also noticed an inordinate amount of time being spent in Distinct$Initial's bag registration, fixed that.

        The task that I cited as taking 57 seconds with this patch earlier? It now takes 30 seconds. Also saw 40% speed improvement vs older version of this patch on a production job.

        Please review .

        Show
        Dmitriy V. Ryaboy added a comment - Significant improvements to transitions from raw to processed map. Better mem utilization estimation. Better logging. While profiling, also noticed an inordinate amount of time being spent in Distinct$Initial's bag registration, fixed that. The task that I cited as taking 57 seconds with this patch earlier? It now takes 30 seconds. Also saw 40% speed improvement vs older version of this patch on a production job. Please review .
        Hide
        Julien Le Dem added a comment -

        Awesome. Stop now or it will start to be negative soon.
        Comments:

        • There's a "pig.exec.nocombiner" that was not replaced by a constant.
        • It would be nice to have a consistent way of getting booleans (and floats) from the conf. Something like:
          PigConfiguration.getBoolean(Properties p, key) {
            return "true".equals(p.getProperty(key, "false"));
          }
          
        • some of the class description was still applicable
          /**
           * Do partial aggregation in map plan. It uses a hash-map to aggregate. 
           * ...
           */
           public class POPartialAgg extends PhysicalOperator {
          
        • what is the reason for this particular value?
           private static final int MAX_LIST_SIZE = 1 << 13 - 1;
          
        • It looks like this could be a HashSet as the value never gets used (but there's no WeakHashSet so I gues I got my answer). It could be as well WeakHashMap<POPartialAgg, ?>. Don't you want a visitor to just list them all once and set the count? That way you would not have to worry about keeping a reference on them.
          private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>();
          
        • +0.5 so that it is never 0 ? Math.min(1, ...) is more readable.
           firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
           secondTierThreshold = (int) (0.5 + totalTuples *  (1f / sizeReduction));
          
        • LOG.info() should be wrapped in if (LOG.isInfoEnabled()) { ... }

          for perf

        • in aggregateSecondLevel() can't the processedInputMap be reused?
        • in getMinOutputReductionFromProp(), if minReduction <= 0 it should throw an exception.
        Show
        Julien Le Dem added a comment - Awesome. Stop now or it will start to be negative soon. Comments: There's a "pig.exec.nocombiner" that was not replaced by a constant. It would be nice to have a consistent way of getting booleans (and floats) from the conf. Something like: PigConfiguration.getBoolean(Properties p, key) { return "true".equals(p.getProperty(key, "false")); } some of the class description was still applicable /** * Do partial aggregation in map plan. It uses a hash-map to aggregate. * ... */ public class POPartialAgg extends PhysicalOperator { what is the reason for this particular value? private static final int MAX_LIST_SIZE = 1 << 13 - 1; It looks like this could be a HashSet as the value never gets used (but there's no WeakHashSet so I gues I got my answer). It could be as well WeakHashMap<POPartialAgg, ?>. Don't you want a visitor to just list them all once and set the count? That way you would not have to worry about keeping a reference on them. private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>(); +0.5 so that it is never 0 ? Math.min(1, ...) is more readable. firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction))); secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction)); LOG.info() should be wrapped in if (LOG.isInfoEnabled()) { ... } for perf in aggregateSecondLevel() can't the processedInputMap be reused? in getMinOutputReductionFromProp(), if minReduction <= 0 it should throw an exception.
        Hide
        Dmitriy V. Ryaboy added a comment -

        There's a "pig.exec.nocombiner" that was not replaced by a constant.

        Fixed.

        It would be nice to have a consistent way of getting booleans (and floats) from the conf

        Feels like scope creep.. maybe in another ticket? I don't want to get into how to design that around Properties, Configurations, and PigConfigurations.

        some of the class description was still applicable

        Added better docs.

        what is the reason for this particular value?

        Bad math . Fixed the math and added an explanation of how I got there.

        Don't you want a visitor to just list them all once and set the count? That way you would not have to worry about keeping a reference on them.

        I could do that, but this feels much cleaner – no visitors, no serialization, no changes to the MRCompiler/JCCompiler, very self-contained, and works at runtime instead of having to be preset by the planner.

        +0.5 so that it is never 0 ? Math.min(1, ...) is more readable.

        No, +0.5 so that it's a round() instead of floor()

        LOG.info() should be wrapped in if (LOG.isInfoEnabled()) { ... } for perf

        Done for places where it matters (functions invoked more than once and messages where args are not constants)

        in aggregateSecondLevel() can't the processedInputMap be reused?

        No – aggregate() adds to the list of tuples in the target map, we want to overwrite in this case.

        in getMinOutputReductionFromProp(), if minReduction <= 0 it should throw an exception.

        Added a log message instead.

        Show
        Dmitriy V. Ryaboy added a comment - There's a "pig.exec.nocombiner" that was not replaced by a constant. Fixed. It would be nice to have a consistent way of getting booleans (and floats) from the conf Feels like scope creep.. maybe in another ticket? I don't want to get into how to design that around Properties, Configurations, and PigConfigurations. some of the class description was still applicable Added better docs. what is the reason for this particular value? Bad math . Fixed the math and added an explanation of how I got there. Don't you want a visitor to just list them all once and set the count? That way you would not have to worry about keeping a reference on them. I could do that, but this feels much cleaner – no visitors, no serialization, no changes to the MRCompiler/JCCompiler, very self-contained, and works at runtime instead of having to be preset by the planner. +0.5 so that it is never 0 ? Math.min(1, ...) is more readable. No, +0.5 so that it's a round() instead of floor() LOG.info() should be wrapped in if (LOG.isInfoEnabled()) { ... } for perf Done for places where it matters (functions invoked more than once and messages where args are not constants) in aggregateSecondLevel() can't the processedInputMap be reused? No – aggregate() adds to the list of tuples in the target map, we want to overwrite in this case. in getMinOutputReductionFromProp(), if minReduction <= 0 it should throw an exception. Added a log message instead.
        Hide
        Julien Le Dem added a comment -

        +1

        Show
        Julien Le Dem added a comment - +1
        Hide
        Dmitriy V. Ryaboy added a comment -

        Same, but with the offending test removed.

        Show
        Dmitriy V. Ryaboy added a comment - Same, but with the offending test removed.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Attaching final patch, committing under Julien's +1.

        Three changes added:
        1) Missed a test class earlier which needed the static string moved to PigConfiguration. Done now.

        2) a slight change to build.xml to ensure junit3 comes before junit4 in the test classpath. Otherwise the build occasionally failed to compile, depending on environment.

        3) An unrelated fix to TestPOCast, which was failing and this preventing me from passing test-commit.

        Show
        Dmitriy V. Ryaboy added a comment - Attaching final patch, committing under Julien's +1. Three changes added: 1) Missed a test class earlier which needed the static string moved to PigConfiguration. Done now. 2) a slight change to build.xml to ensure junit3 comes before junit4 in the test classpath. Otherwise the build occasionally failed to compile, depending on environment. 3) An unrelated fix to TestPOCast, which was failing and this preventing me from passing test-commit.

          People

          • Assignee:
            Dmitriy V. Ryaboy
            Reporter:
            Dmitriy V. Ryaboy
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development