Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12575

Introduce planner rules to remove redundant shuffle and collation

    XMLWordPrintableJSON

Details

    Description

      Exchange and Sort is the most heavy operator, they are created in FlinkExpandConversionRule when some operators require its inputs to satisfy distribution trait or collation trait in planner rules. However, many operators could provide distribution or collation, e.g. BatchExecHashAggregate or BatchExecHashJoin could provide distribution on its shuffle keys, BatchExecSortMergeJoin could provide distribution and collation on its join keys. If the provided traits could satisfy the required traits, the Exchange or the Sort is redundant.
      e.g.

      schema:
      x: a int, b bigint, c varchar
      y: d int, e bigint, f varchar
      t1: a1 int, b1 bigint, c1 varchar
      t2: d1 int, e1 bigint, f1 varchar
      
      sql:
      select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2 on a1 = d1 and b1 = e1
      
      the physical plan after redundant Exchange and Sort are removed:
      SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...)
      :- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...)
      :  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
      :  :  :- Exchange(distribution=[hash[a, b]])
      :  :  :  +- TableSourceScan(table=[[x]], ...)
      :  :  +- Exchange(distribution=[hash[d, e]])
      :  :     +- TableSourceScan(table=[[y]], ...)
      :  +- Exchange(distribution=[hash[a1, b1]])
      :     +- TableSourceScan(table=[[t1]], ...)
      +- Exchange(distribution=[hash[d1, e1]])
         +- TableSourceScan(table=[[t2]], ...)
      

      In above physical plan, the Exchanges between SortMergeJoins are redundant due to their shuffle keys are same, the Sorts in the top two SortMergeJoins' left hand side are redundant due to its input is sorted.
      notes: after exchange removed, there maybe exist a sub-tree like localHashAggregate -> globalHashAggregate, the localHashAggregate should be removed due to localHashAggregate is redundant. so do localRank -> globalRank, localSortAggregate -> globalSortAggregate.

      another situation is the shuffle and collation could be removed between multiple OVERs. e.g.

      schema:
      MyTable: a int, b int, c varchar
      
      sql:
      SELECT
          COUNT(*) OVER (PARTITION BY c ORDER BY a),
          SUM(a) OVER (PARTITION BY b ORDER BY a),
          RANK() OVER (PARTITION BY c ORDER BY a, c),
          SUM(a) OVER (PARTITION BY b ORDER BY a),
          COUNT(*) OVER (PARTITION BY c ORDER BY c)
       FROM MyTable
      
      the physical plan after redundant Exchange and Sort are removed:
      Calc(select=[...])
      +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
         +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...], window#1=[RANK(*) ...], ...)
            +- Sort(orderBy=[c ASC, a ASC])
               +- Exchange(distribution=[hash[c]])
                  +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a), $SUM0(a) ...], ...)
                     +- Sort(orderBy=[b ASC, a ASC])
                        +- Exchange(distribution=[hash[b]])
                           +- TableSourceScan(table=[[MyTable]], ...)
      

      the Exchange and Sort between the top two OverAggregates are redundant due to their shuffle keys and sort keys are same.

      Attachments

        Issue Links

          Activity

            People

              godfreyhe godfrey he
              godfreyhe godfrey he
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m