Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-16986

[C++] Infer project nodes lazily to avoid unnecessary chains when consuming Substrait

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • C++

    Description

      Naively converting everything projection-like in Substrait (ProjectRel, emit clauses, and complex expressions in non-project and non-filter relations) to individual project nodes would lead to conversion of optimal(-ish) Substrait plans to suboptimal Acero plans. Let's use this (more-or-less optimal) Substrait plan as an example (column indices are as Substrait would see them, column names between parens to make it a little easier to see what's going on):

      • Read column 2 (=A), 3 (=B), and 4 (=C) from a Parquet file (emit: [2 (=A), 3 (=B), 4 (=C)])
      • Project column 1 (B) + column 2 (=C) to get column 3 (=B+C), then drop column 1 (=B) (emit: [0 (=A), 2 (=C), 3 (=B+C)])
      • Order by column 0 (=A) - column 2 (=B+C), so effectively by A-B-C, then drop column 0 (=A) (emit: [1 (=C), 2 (=B+C)])

      Converting this naively, without exceptions to collapse the emit clause of ReadRel and ProjectRel, would probably yield:

      • For the ReadRel:
        • Scanner that reads all columns
        • Project [field(2), field(3), field(4)] for the emit clause
      • For the ProjectRel:
        • Project [field(0), field(1), field(2), add(field(1), field(2))] for the body
        • Project [field(0), field(2), field(3)] for the emit clause
      • For the SortRel:
        • Project [field(0), field(1), field(2), sub(field(0), field(2))] to get the sort key
        • Order by field(3)
        • Project [field(0), field(1), field(2)] to revert adding the temporary column
        • Project [field(1), field(2)] for the emit clause

      That's a lot of project nodes, when the user would probably expect something like this:

      • Scanner
      • Project [field(2) (=A), field(4) (=C), add(field(3), field(4)) (=B+C)] to drop unneeded columns and compute B+C
      • Project [field(0) (=A), field(1) (=C), field(2) (=B+C), sub(field(0), field(2)) (=A-B-C)] to compute the sort key (not collapsed with previous because that would repeat the B+C subexpression)
      • Sort on field(3)
      • Project [field(1) (=C), field(2) (=B+C)] to drop the temporary field and column A

      I suggest the following for future me (or someone else, but feel free to do what you think is best in that case). For relation ToProtos, rather than returning a Declaration, return a new class that tracks:

      • declaration: the compute::Declaration up to this point;
      • pending_projection: a vector of Substrait expressions that represent the schema Substrait expects based on the schema returned by the compute::Declaration, using empty expressions to signal that no change is needed;
      • temporaries: a vector of Substrait expressions that will be needed as temporaries to express the next relation, such as complex expressions encountered in a join condition.

      The class would need functions to:

      • construct from a scanner declaration and number of columns.
        • post-condition: declaration is set to the given declaration.
        • post-condition: pending_declarations.size() equals the number of columns.
        • post-condition: all expressions in pending_declarations are simply field references to their own column index.
        • post-condition: temporaries is empty.
      • update the state based on an emit clause.
        • pre-condition: temporaries is empty (otherwise column indices will desync).
        • if the emit clause is no-op, do nothing.
        • swizzle/remove elements in pending_declaration based on the emit clause.
      • update the state based on the body of a ProjectRel.
        • pre-condition: temporaries is empty (otherwise column indices will desync).
        • some heuristic to determine whether to commit pending expressions into a Project node before appending the incoming expressions or not [?]; being too lazy may result in duplicated subexpressions, while being too eager may yield unnecessary project relations. It probably pays to be eager here, unless all pending expressions are any combination of only literals or field references.
        • append the incoming expressions to the pending expression vector.
      • force a commit of the pending expressions to a Project node (if any are pending).
        • post-condition: all expressions in pending_declarations are simply field references to their own column index.
        • post-condition: temporaries is empty.
      • append a temporary expression, yielding a FieldRef for the next relation to make use of; to be used whenever Substrait allows an arbitrary expression in a place where Acero only supports FieldRefs.
        • expression is appended to the back of temporaries.
        • FieldRef returned simply references the index of the added temporary + pending_projection.size().
      • update the declaration by means of a (Declaration) -> Result<Declaration> closure, which may make use of the FieldRefs returned by previous add_temporary() calls.
        • commit pending expressions to a Project node (if necessary).
        • clear the list of temporaries.
        • update declaration using the closure.
        • post-condition: all expressions in pending_declarations are simply field references to their own column index.
        • post-condition: temporaries is empty.
      • yield the final declaration.
        • pre-condition: temporaries is empty (caller is doing something weird if it's not).
        • commit pending expressions to a Project node (if necessary).
        • return move(declaration).

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jvanstraten Jeroen van Straten
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: