Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33446

SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation doesn't produce the correct plan

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.17.2, 1.19.0, 1.18.1
    • None
    • Table SQL / Planner
    • None

    Description

      Although this test doesn't throw an exception, the final plan produces 3 columns rather than 2 after optimization.

      LogicalProject(inputs=[0..1], exprs=[[$4]])
      +- LogicalFilter(condition=[IS NULL($5)])
         +- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
            :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
            :  +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
            +- LogicalProject(inputs=[0..2], exprs=[[true]])
               +- LogicalAggregate(group=[{0, 1, 2}])
                  +- LogicalProject(inputs=[0..2])
                     +- LogicalFilter(condition=[IS NULL($3)])
                        +- LogicalJoin(condition=[true], joinType=[left])
                           :- LogicalFilter(condition=[IS NOT NULL($0)])
                           :  +- LogicalProject(exprs=[[+($0, 1)]])
                           :     +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]])
                           +- LogicalProject(inputs=[0..1], exprs=[[true]])
                              +- LogicalAggregate(group=[{0, 1}])
                                 +- LogicalProject(exprs=[[$3, $0]])
                                    +- LogicalFilter(condition=[AND(=($1, $0), =(CAST($2):BIGINT, $3))])
                                       +- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0, 6), CAST(+($0, 6)):BIGINT]])
                                          +- LogicalTableScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]])
      
      

      After digging, I think it's the SubQueryRemoveRule doesn't generate the Correlate but generates the Join node, which causes the failure of the decorrelation. For a quick fix, I think we should throw an exception to notify users it's not a supported feature in the Flink.

      There might exist 2 ways to fix this issue:
      1. Expand subquery when converting SQL to rel. After experimenting with calcite, I found that the Sql2RelConverter generates the correct plan.

      LogicalProject(inputs=[0..1])
      +- LogicalFilter(condition=[IS NULL($2)])
         +- LogicalCorrelate(correlation=[$cor7], joinType=[left], requiredColumns=[{0, 1}])
            :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
            :  +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
            +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
               +- LogicalProject(exprs=[[true]])
                  +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
                     +- LogicalCorrelate(correlation=[$cor4], joinType=[left], requiredColumns=[{0}])
                        :- LogicalProject(inputs=[0])
                        :  +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d1, e, f)]]])
                        +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
                           +- LogicalProject(exprs=[[true]])
                              +- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1, $cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
                                 +- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0, 6)]])
                                    +- LogicalTableScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]])
      

      You can find the new plan uses a correlate node rather than a join node.

      2. CALCITE-5789 has fix this problem by removing the nested correlation node.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              fsk119 Shengkai Fang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: