Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29804

Support convertion of Correlate to Join if correlation variable is unused introduced in Calcite 1.28

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.17.0
    • None
    • Table SQL / Planner
    • None

    Description

      It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
      and leads to issues in a number of tests like SetOperatorsTest, CorrelateTest, SetOperatorsTest, TemporalTableFunctionJoinTest and probably some integration tests

      An example of failure

      org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 
      
      FlinkLogicalJoin(condition=[true], joinType=[inner])
      :- FlinkLogicalCalc(select=[c])
      :  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
      +- FlinkLogicalCalc(select=[d], where=[>(e, 20)])
         +- FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
      
      This exception indicates that the query uses an unsupported SQL feature.
      Please check the documentation for the set of currently supported SQL features.
      
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
      	at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
      	at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
      	at scala.collection.Iterator.foreach(Iterator.scala:937)
      	at scala.collection.Iterator.foreach$(Iterator.scala:937)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
      	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
      	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
      	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
      	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
      	at scala.collection.immutable.List.foreach(List.scala:388)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
      	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
      	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:982)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:896)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:658)
      	at org.apache.flink.table.planner.plan.batch.table.CorrelateTest.testCorrelateWithMultiFilterAndWithoutCalcMergeRules(CorrelateTest.scala:106)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
      	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
      	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
      Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
      Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> BATCH_PHYSICAL]
      There is 1 empty subset: rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows
      377:FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
      
      Root: rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]
      Original rel:
      FlinkLogicalJoin(subset=[rel#344:RelSubset#5.LOGICAL.any.[]], condition=[true], joinType=[inner]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 1.200000001E9 io, 0.0 network, 0.0 memory}, id = 356
        FlinkLogicalCalc(subset=[rel#354:RelSubset#1.LOGICAL.any.[]], select=[c]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 357
          FlinkLogicalLegacyTableSourceScan(subset=[rel#347:RelSubset#0.LOGICAL.any.[]], table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}, id = 346
        FlinkLogicalCalc(subset=[rel#355:RelSubset#4.LOGICAL.any.[]], select=[d], where=[>(e, 20)]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 363
          FlinkLogicalTableFunctionScan(subset=[rel#350:RelSubset#2.LOGICAL.any.[]], invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 349
      
      Sets:
      Set#6, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)
      	rel#380:RelSubset#6.LOGICAL.any.[], best=rel#346
      		rel#346:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
      	rel#391:RelSubset#6.BATCH_PHYSICAL.any.[], best=rel#390
      		rel#390:BatchPhysicalLegacyTableSourceScan.BATCH_PHYSICAL.any.[](table=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
      Set#7, type: RecordType(VARCHAR(2147483647) c)
      	rel#382:RelSubset#7.LOGICAL.any.[], best=rel#381
      		rel#381:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#380,select=c), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
      	rel#393:RelSubset#7.BATCH_PHYSICAL.any.[], best=rel#392
      		rel#392:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#391,select=c), rowcount=1.0E8, cumulative cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
      Set#8, type: RecordType(VARCHAR(2147483647) d, INTEGER e)
      	rel#383:RelSubset#8.LOGICAL.any.[], best=rel#377
      		rel#377:FlinkLogicalTableFunctionScan.LOGICAL.any.[](invocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647) d, INTEGER e)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
      	rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
      Set#9, type: RecordType(VARCHAR(2147483647) d)
      	rel#385:RelSubset#9.LOGICAL.any.[], best=rel#384
      		rel#384:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#383,select=d,where=>(e, 20)), rowcount=1.0, cumulative cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
      	rel#396:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
      		rel#395:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#394,select=d,where=>(e, 20)), rowcount=1.0, cumulative cost={inf}
      		rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0, cumulative cost={inf}
      		rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast), rowcount=1.0, cumulative cost={inf}
      	rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[], best=null
      		rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0, cumulative cost={inf}
      		rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast), rowcount=1.0, cumulative cost={inf}
      Set#10, type: RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d)
      	rel#387:RelSubset#10.LOGICAL.any.[], best=rel#386
      		rel#386:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner), rowcount=1.0E8, cumulative cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network, 0.0 memory}
      	rel#388:RelSubset#10.BATCH_PHYSICAL.any.[], best=null
      		rel#389:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]), rowcount=1.0E8, cumulative cost={inf}
      		rel#400:BatchPhysicalNestedLoopJoin.BATCH_PHYSICAL.any.[](left=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c, d,build=right), rowcount=1.0E8, cumulative cost={inf}
      
      Graphviz:
      digraph G {
      	root [style=filled,label="Root"];
      	subgraph cluster6{
      		label="Set 6 RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)";
      		rel346 [label="rel#346:FlinkLogicalLegacyTableSourceScan\ntable=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		rel390 [label="rel#390:BatchPhysicalLegacyTableSourceScan\ntable=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c\nrows=1.0E8, cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		subset380 [label="rel#380:RelSubset#6.LOGICAL.any.[]"]
      		subset391 [label="rel#391:RelSubset#6.BATCH_PHYSICAL.any.[]"]
      	}
      	subgraph cluster7{
      		label="Set 7 RecordType(VARCHAR(2147483647) c)";
      		rel381 [label="rel#381:FlinkLogicalCalc\ninput=RelSubset#380,select=c\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		rel392 [label="rel#392:BatchPhysicalCalc\ninput=RelSubset#391,select=c\nrows=1.0E8, cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		subset382 [label="rel#382:RelSubset#7.LOGICAL.any.[]"]
      		subset393 [label="rel#393:RelSubset#7.BATCH_PHYSICAL.any.[]"]
      	}
      	subgraph cluster8{
      		label="Set 8 RecordType(VARCHAR(2147483647) d, INTEGER e)";
      		rel377 [label="rel#377:FlinkLogicalTableFunctionScan\ninvocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647) d, INTEGER e)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		subset383 [label="rel#383:RelSubset#8.LOGICAL.any.[]"]
      		subset394 [label="rel#394:RelSubset#8.BATCH_PHYSICAL.any.[]",color=red]
      	}
      	subgraph cluster9{
      		label="Set 9 RecordType(VARCHAR(2147483647) d)";
      		rel384 [label="rel#384:FlinkLogicalCalc\ninput=RelSubset#383,select=d,where=>(e, 20)\nrows=1.0, cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		rel395 [label="rel#395:BatchPhysicalCalc\ninput=RelSubset#394,select=d,where=>(e, 20)\nrows=1.0, cost={inf}",shape=box]
      		rel399 [label="rel#399:AbstractConverter\ninput=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0, cost={inf}",shape=box]
      		rel403 [label="rel#403:BatchPhysicalExchange\ninput=RelSubset#396,distribution=broadcast\nrows=1.0, cost={inf}",shape=box]
      		subset385 [label="rel#385:RelSubset#9.LOGICAL.any.[]"]
      		subset396 [label="rel#396:RelSubset#9.BATCH_PHYSICAL.any.[]"]
      		subset398 [label="rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[]"]
      		subset396 -> subset398;	}
      	subgraph cluster10{
      		label="Set 10 RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d)";
      		rel386 [label="rel#386:FlinkLogicalJoin\nleft=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner\nrows=1.0E8, cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		rel389 [label="rel#389:AbstractConverter\ninput=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
      		rel400 [label="rel#400:BatchPhysicalNestedLoopJoin\nleft=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c, d,build=right\nrows=1.0E8, cost={inf}",shape=box]
      		subset387 [label="rel#387:RelSubset#10.LOGICAL.any.[]"]
      		subset388 [label="rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]"]
      	}
      	root -> subset388;
      	subset380 -> rel346[color=blue];
      	subset391 -> rel390[color=blue];
      	subset382 -> rel381[color=blue]; rel381 -> subset380[color=blue];
      	subset393 -> rel392[color=blue]; rel392 -> subset391[color=blue];
      	subset383 -> rel377[color=blue];
      	subset385 -> rel384[color=blue]; rel384 -> subset383[color=blue];
      	subset396 -> rel395; rel395 -> subset394;
      	subset398 -> rel399; rel399 -> subset396;
      	subset398 -> rel403; rel403 -> subset396;
      	subset387 -> rel386[color=blue]; rel386 -> subset382[color=blue,label="0"]; rel386 -> subset385[color=blue,label="1"];
      	subset388 -> rel389; rel389 -> subset387;
      	subset388 -> rel400; rel400 -> subset393[label="0"]; rel400 -> subset398[label="1"];
      }
      	at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
      	at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
      	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:533)
      	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
      	... 55 more
      
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Sergey Nuyanzin Sergey Nuyanzin
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: