Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19939

Remove redundant union from multiple input node

    XMLWordPrintableJSON

Details

    Description

      Consider the following SQL and the execution plan.

      WITH
        T1 AS (SELECT COUNT(*) AS cnt FROM x GROUP BY a),
        T2 AS (SELECT COUNT(*) AS cnt FROM y GROUP BY d),
        T3 AS (SELECT b AS cnt FROM x INNER JOIN y ON x.b = y.e)
      SELECT cnt FROM
        (SELECT cnt FROM T1)
        UNION ALL
        (SELECT cnt FROM T2)
        UNION ALL
        (SELECT cnt FROM T3)
      
      
      MultipleInputNode(readOrder=[1,0,0,0], members=[\nUnion(all=[true], union=[cnt])\n:- Union(all=[true], union=[cnt])\n:  :- Calc(select=[CAST(cnt) AS cnt])\n:  :  +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt])\n:  :     +- [#3] Exchange(distribution=[hash[a]])\n:  +- Calc(select=[CAST(cnt) AS cnt])\n:     +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) AS cnt])\n:        +- [#4] Exchange(distribution=[hash[d]])\n+- Calc(select=[b AS cnt])\n   +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[b, e], build=[right])\n      :- [#1] Exchange(distribution=[hash[b]])\n      +- [#2] Exchange(distribution=[hash[e]])\n])
      :- Exchange(distribution=[hash[b]])
      :  +- Calc(select=[b])
      :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
      :- Exchange(distribution=[hash[e]])
      :  +- Calc(select=[e])
      :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
      :- Exchange(distribution=[hash[a]])
      :  +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
      :     +- Calc(select=[a])
      :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
      +- Exchange(distribution=[hash[d]])
         +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0])
            +- Calc(select=[d])
               +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
      

      The two unions here in multiple input here is actually redundant, as the amount of data shuffled will not change even if they're moved out of the multiple input node. We should remove such redundant union from multiple input nodes.

      Attachments

        Issue Links

          Activity

            People

              TsReaper Caizhi Weng
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: