Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.17.2, 1.19.0, 1.18.1
-
None
-
None
Description
Although this test doesn't throw an exception, the final plan produces 3 columns rather than 2 after optimization.
LogicalProject(inputs=[0..1], exprs=[[$4]]) +- LogicalFilter(condition=[IS NULL($5)]) +- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left]) :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]]) : +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]]) +- LogicalProject(inputs=[0..2], exprs=[[true]]) +- LogicalAggregate(group=[{0, 1, 2}]) +- LogicalProject(inputs=[0..2]) +- LogicalFilter(condition=[IS NULL($3)]) +- LogicalJoin(condition=[true], joinType=[left]) :- LogicalFilter(condition=[IS NOT NULL($0)]) : +- LogicalProject(exprs=[[+($0, 1)]]) : +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]]) +- LogicalProject(inputs=[0..1], exprs=[[true]]) +- LogicalAggregate(group=[{0, 1}]) +- LogicalProject(exprs=[[$3, $0]]) +- LogicalFilter(condition=[AND(=($1, $0), =(CAST($2):BIGINT, $3))]) +- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0, 6), CAST(+($0, 6)):BIGINT]]) +- LogicalTableScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]])
After digging, I think it's the SubQueryRemoveRule doesn't generate the Correlate but generates the Join node, which causes the failure of the decorrelation. For a quick fix, I think we should throw an exception to notify users it's not a supported feature in the Flink.
There might exist 2 ways to fix this issue:
1. Expand subquery when converting SQL to rel. After experimenting with calcite, I found that the Sql2RelConverter generates the correct plan.
LogicalProject(inputs=[0..1]) +- LogicalFilter(condition=[IS NULL($2)]) +- LogicalCorrelate(correlation=[$cor7], joinType=[left], requiredColumns=[{0, 1}]) :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]]) : +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]]) +- LogicalAggregate(group=[{}], agg#0=[MIN($0)]) +- LogicalProject(exprs=[[true]]) +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))]) +- LogicalCorrelate(correlation=[$cor4], joinType=[left], requiredColumns=[{0}]) :- LogicalProject(inputs=[0]) : +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d1, e, f)]]]) +- LogicalAggregate(group=[{}], agg#0=[MIN($0)]) +- LogicalProject(exprs=[[true]]) +- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1, $cor4.d1), =(CAST($2):BIGINT, $cor7.d3))]) +- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0, 6)]]) +- LogicalTableScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]])
You can find the new plan uses a correlate node rather than a join node.
2. CALCITE-5789 has fix this problem by removing the nested correlation node.
Attachments
Issue Links
- relates to
-
FLINK-29540 SubQueryAntiJoinTest started to fail after Calcite 1.27
- Open