Pig
  1. Pig
  2. PIG-697

Proposed improvements to pig's optimizer

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.4.0
    • Component/s: impl
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      I propose the following changes to pig optimizer, plan, and operator functionality to support more robust optimization:

      1) Remove the required array from Rule. This will change rules so that they only match exact patterns instead of allowing missing elements in the pattern.
      This has the downside that if a given rule applies to two patterns (say Load->Filter->Group, Load->Group) you have to write two rules. But it has the upside that
      the resulting rules know exactly what they are getting. The original intent of this was to reduce the number of rules that needed to be written. But the
      resulting rules have do a lot of work to understand the operators they are working with. With exact matches only, each rule will know exactly the operators it
      is working on and can apply the logic of shifting the operators around. All four of the existing rules set all entries of required to true, so removing this
      will have no effect on them.

      2) Change PlanOptimizer.optimize to iterate over the rules until there are no conversions or a certain number of iterations has been reached. Currently the
      function is:

          public final void optimize() throws OptimizerException {
              RuleMatcher matcher = new RuleMatcher();
              for (Rule rule : mRules) {
                  if (matcher.match(rule)) {
                      // It matches the pattern.  Now check if the transformer
                      // approves as well.
                      List<List<O>> matches = matcher.getAllMatches();
                      for (List<O> match:matches)
                      {
      	                if (rule.transformer.check(match)) {
      	                    // The transformer approves.
      	                    rule.transformer.transform(match);
      	                }
                      }
                  }
              }
          }
      

      It would change to be:

          public final void optimize() throws OptimizerException {
              RuleMatcher matcher = new RuleMatcher();
              boolean sawMatch;
              int iterators = 0;
              do {
                  sawMatch = false;
                  for (Rule rule : mRules) {
                      List<List<O>> matches = matcher.getAllMatches();
                      for (List<O> match:matches) {
                          // It matches the pattern.  Now check if the transformer
                          // approves as well.
                          if (rule.transformer.check(match)) {
                              // The transformer approves.
                              sawMatch = true;
                              rule.transformer.transform(match);
                          }
                      }
                  }
                  // Not sure if 1000 is the right number of iterations, maybe it
                  // should be configurable so that large scripts don't stop too 
                  // early.
              } while (sawMatch && numIterations++ < 1000);
          }
      

      The reason for limiting the number of iterations is to avoid infinite loops. The reason for iterating over the rules is so that each rule can be applied multiple
      times as necessary. This allows us to write simple rules, mostly swaps between neighboring operators, without worrying that we get the plan right in one pass.
      For example, we might have a plan that looks like: Load->Join->Filter->Foreach, and we want to optimize it to Load->Foreach->Filter->Join. With two simple
      rules (swap filter and join and swap foreach and filter), applied iteratively, we can get from the initial to final plan, without needing to understanding the
      big picture of the entire plan.

      3) Add three calls to OperatorPlan:

      /**
       * Swap two operators in a plan.  Both of the operators must have single
       * inputs and single outputs.
       * @param first operator
       * @param second operator
       * @throws PlanException if either operator is not single input and output.
       */
      public void swap(E first, E second) throws PlanException {
          ...
      }
      
      /**
       * Push one operator in front of another.  This function is for use when
       * the first operator has multiple inputs.  The caller can specify
       * which input of the first operator the second operator should be pushed to.
       * @param first operator, assumed to have multiple inputs.
       * @param second operator, will be pushed in front of first
       * @param inputNum, indicates which input of the first operator the second
       * operator will be pushed onto.  Numbered from 0.
       * @throws PlanException if inputNum does not exist for first operator
       */
      public void pushBefore(E first, E second, int inputNum) throws PlanException {
          ...
      }
      
      /**
       * Push one operator after another.  This function is for use when the second
       * operator has multiple outputs.  The caller can specify which output of the
       * second operator the first operator should be pushed to.
       * @param first operator, will be pushed after the second operator
       * @param second operator, assumed to have multiple outputs
       * @param outputNum indicates which output of the second operator the first 
       * operator will be pushed onto.  Numbered from 0.
       * @throws PlanException if outputNum does not exist for second operator
       */
      public void pushAfter(E first, E second, int outputNum) throws PlanException {
          ...
      }
      

      The rules in the optimizer can use these three functions, along with the existing insertBetween(), replace(), and removeAndReconnect() calls to operate on the
      plan.

      4) Add a new call to Operator:

      /**
       * Make any necessary changes to a node based on a change of position in the
       * plan.  This allows operators to rewire their projections, etc. when they
       * are relocated in a plan.
       * @param oldPred Operator that was previously the predecessor.
       * @param newPred Operator thwas will now be the predecessor.
       * @throws PlanException
       */
      public abstract void rewire(Operator oldPred, Operator newPred) throws PlanException;
      

      This method will be called by the swap, pushBefore, pushAfter, insertBetween, replace, and removeAndReconnect in OperatorPlan whenever an operator is moved
      around so that the operator has a chance to make any necessary changes.

      5) Add new calls to LogicalOperator and PhysicalOperator

      /**
       * A struct detailing how a projection is altered by an operator.
       */
      public class ProjectionMap {
          /**
           * Quick way for an operator to note that its input and output are the
           * same.
           */
          public boolean noChange;
      
          /**
           * Map of field changes, with keys being the output fields of the 
           * operator and values being the input fields.  Fields are numbered from
           * 0.  So for a foreach operator derived from
           * 'B = foreach A generate $0, $2, $3, udf($1)' 
           * would produce a mapping of 0->0, 1->2, 2->3
           */
          public Map<Integer, Integer> mappedFields;
      
          /**
           * List of fields removed from the input.  This includes fields that were
           * transformed, and thus are no longer the same fields.  Using the
           * example foreach given under mappedFields, this list would contain '1'.
           */
          public List<Integer> removedFields;
      
          /**
           * List of fields in the output of this operator that were created by this
           * operator.  Using the example foreach given under mappedFields, this list
           * would contain '3'.
           */
          public List<Integer> addedFields;
      }
      
      /**
       * Produce a map describing how this operator modifies its projection.
       * @returns ProjectionMap null indicates it does not know how the projection
       * changes, for example a join of two inputs where one input does not have
       * a schema.
       */
      public abstract ProjectionMap getProjectionMap();
      
      /**
       * Get a list of fields that this operator requires.  This is not necessarily
       * equivalent to the list of fields the operator projects.  For example,
       * a filter will project anything passed to it, but requires only the fields
       * explicitly referenced in its filter expression.
       * @return list of fields, numbered from 0.
       */
      public abstract List<Integer> getRequiredFields();
      

      These calls will be called by optimizer rules to determine whether or not a swap can be done (for example, you can't swap two operators if the second one uses a
      field added by the first), and once the swap is done they will be used by rewire to understand how to map projections in the operators.

      6) It's not clear that the RuleMatcher, in its current form, will work with rules that are not linear. That is, it matches rules that look like:
      Operators

      {Foreach, Filter}

      Edges

      {0->1}

      But I don't know if it will match rules that look like:
      Operators

      {Scan, Scan, Join}

      Edges

      {0->2, 1->2}

      For the optimizer to be able to determine join types and operations with splits, it will have to be able to do that.

      Examples of types of rules that is optimizer could support:

      1) Pushing filters in front of joins.
      2) Pushing foreachs with flattens (which thus greathly expand the data) down the tree past filters, joins, etc.
      3) Pushing type casting used for schemas in loads down to the point where the field is actually used.
      4) Deciding when to do fragment/replicate join or sort/merge join instead of the standard hash join.
      5) The current optimizations: pushing limit up the tree, making implicit splits explicit, merge load and stream where possible, using the combiner.
      6) Merge filters or foreachs where possible

      In particular the combiner optimizer hopefully can be completely rewritten to use the optimizer framework to make decisions about how to rework physical plans
      to push work into the combiner.

      1. Optimizer_Phase5.patch
        78 kB
        Santhosh Srinivasan
      2. OptimizerPhase1_part2.patch
        71 kB
        Santhosh Srinivasan
      3. OptimizerPhase1.patch
        126 kB
        Santhosh Srinivasan
      4. OptimizerPhase2.patch
        106 kB
        Santhosh Srinivasan
      5. OptimizerPhase3_parrt1.patch
        199 kB
        Santhosh Srinivasan
      6. OptimizerPhase3_parrt1-1.patch
        199 kB
        Santhosh Srinivasan
      7. OptimizerPhase3_part2_3.patch
        168 kB
        Santhosh Srinivasan
      8. OptimizerPhase4_part1-1.patch
        139 kB
        Santhosh Srinivasan
      9. OptimizerPhase4_part2.patch
        72 kB
        Santhosh Srinivasan

        Issue Links

          Activity

          No work has yet been logged on this issue.

            People

            • Assignee:
              Santhosh Srinivasan
              Reporter:
              Alan Gates
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development