Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26167

Explicitly set the partitioner for the sql operators whose shuffle and sort are removed

    XMLWordPrintableJSON

Details

    Description

      After FLINK-25995 is finished, we have add an exchange (which will be converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not need explicitly hash shuffle (which input has already hashed)

      e.g.

      WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT sum(b1) FROM r group by a1
      

      the plan after FLINK-25995 is finished:

      Calc(select=[EXPR$0])
      +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
         +- Exchange(distribution=[keep_input_as_is[hash[a1]])
            +- Calc(select=[a1, b1])
               +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2], build=[left])
                  :- Exchange(distribution=[hash[a1]])
                  :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
                  :     +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], fields=[a1, b1, c1])
                  +- Exchange(distribution=[hash[a2]])
                     +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2])
      

      but data between Calc and HashJoin may be out of order once their parallelism is different, so an Exchange(distribution=[keep_input_as_is[hash[a1]]) should be added between them.

      Attachments

        Issue Links

          Activity

            People

              godfreyhe godfrey he
              godfreyhe godfrey he
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: