Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      As reported by Scott Carey in PIG-479, combiner does not get used for co-group, even if the functions applied on the bags are algebraic . -
      Quoting from the comment -
      "For example, I'm not quite sure why this one doesn't use a combiner - it reads ~350x as much input bytes from HDFS as its reduce output, a combiner would be very effective:

      J = COGROUP
      UV BY (s, d, h, g, p, pa, st) OUTER,
      UC BY (s, d, h, g, p, pa, st) OUTER,
      AT BY (s, d, h, g, p, pa, st) OUTER,
      V BY (s, d, h, g, p, pa, st) OUTER,
      C BY (s, d, h, g, p, pa, st) OUTER;

      OUTPUT = FOREACH J GENERATE
      FLATTEN(group) as (s, d, h, g, p, pa, st),
      COUNT_STAR(C) as c,
      COUNT_STAR(V) as v,
      SUM(AT.p1) as p1,
      SUM(AT.p2) as p2,
      SUM(AT.p3) as p3,
      SUM(UC.q) as ucq,
      SUM(UC.r) as ucr,
      SUM(UV.q) as uvq,
      SUM(UV.r) as uvr;
      "

        Activity

        Thejas M Nair made changes -
        Assignee Thejas M Nair [ thejas ]
        Olga Natkovich made changes -
        Field Original Value New Value
        Fix Version/s 0.9.0 [ 12315191 ]
        Hide
        Thejas M Nair added a comment -

        In existing implementation, the optimizer (CombinerOptimizer.java) adds combiner to the plan when (co)group has single input. The combiner code is also written to handle only a single input (see POCombinerPackage.java) .

        Some rough/initial/incomplete thoughts about implementation for (co)group with multiple inputs:

        Combiner should(/can) be used only when the expressions in the foreach following (co)group get inputs from algebraic udfs or group key columns, and the algebraic udf arguments are from only one of the input relations.

        In the combiner, tuples from each input stream should be grouped/combined separately. ie, as if the group-by column is 'group-key-columns + input-index' . POPackage (or a new subclass) will need to support this grouping. A POForEach statement will follow the POPackage, it can use a conditional operator (POBinCond) with condition on input-index to evaluate corresponding udfs.

        CombinerOptimizer will need to identify the relational inputs that each algebraic udfs gets input from, and add a POForeach with 'Initial' calls to the udf for each input in the map physical plan.

        Show
        Thejas M Nair added a comment - In existing implementation, the optimizer (CombinerOptimizer.java) adds combiner to the plan when (co)group has single input. The combiner code is also written to handle only a single input (see POCombinerPackage.java) . Some rough/initial/incomplete thoughts about implementation for (co)group with multiple inputs: Combiner should(/can) be used only when the expressions in the foreach following (co)group get inputs from algebraic udfs or group key columns, and the algebraic udf arguments are from only one of the input relations. In the combiner, tuples from each input stream should be grouped/combined separately. ie, as if the group-by column is 'group-key-columns + input-index' . POPackage (or a new subclass) will need to support this grouping. A POForEach statement will follow the POPackage, it can use a conditional operator (POBinCond) with condition on input-index to evaluate corresponding udfs. CombinerOptimizer will need to identify the relational inputs that each algebraic udfs gets input from, and add a POForeach with 'Initial' calls to the udf for each input in the map physical plan.
        Thejas M Nair created issue -

          People

          • Assignee:
            Unassigned
            Reporter:
            Thejas M Nair
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development