Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.17.0
-
None
-
None
Description
It was introduced in https://issues.apache.org/jira/browse/CALCITE-4668
and leads to issues in a number of tests like SetOperatorsTest, CorrelateTest, SetOperatorsTest, TemporalTableFunctionJoinTest and probably some integration tests
An example of failure
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalJoin(condition=[true], joinType=[inner]) :- FlinkLogicalCalc(select=[c]) : +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- FlinkLogicalCalc(select=[d], where=[>(e, 20)]) +- FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45) at scala.collection.immutable.List.foreach(List.scala:388) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) at org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:982) at org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:896) at org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:658) at org.apache.flink.table.planner.plan.batch.table.CorrelateTest.testCorrelateWithMultiFilterAndWithoutCalcMergeRules(CorrelateTest.scala:106) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[]. Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> BATCH_PHYSICAL] There is 1 empty subset: rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows 377:FlinkLogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]) Root: rel#388:RelSubset#10.BATCH_PHYSICAL.any.[] Original rel: FlinkLogicalJoin(subset=[rel#344:RelSubset#5.LOGICAL.any.[]], condition=[true], joinType=[inner]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 1.200000001E9 io, 0.0 network, 0.0 memory}, id = 356 FlinkLogicalCalc(subset=[rel#354:RelSubset#1.LOGICAL.any.[]], select=[c]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 357 FlinkLogicalLegacyTableSourceScan(subset=[rel#347:RelSubset#0.LOGICAL.any.[]], table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}, id = 346 FlinkLogicalCalc(subset=[rel#355:RelSubset#4.LOGICAL.any.[]], select=[d], where=[>(e, 20)]): rowcount = 1.0, cumulative cost = {1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 363 FlinkLogicalTableFunctionScan(subset=[rel#350:RelSubset#2.LOGICAL.any.[]], invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 349 Sets: Set#6, type: RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c) rel#380:RelSubset#6.LOGICAL.any.[], best=rel#346 rel#346:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} rel#391:RelSubset#6.BATCH_PHYSICAL.any.[], best=rel#390 rel#390:BatchPhysicalLegacyTableSourceScan.BATCH_PHYSICAL.any.[](table=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c), rowcount=1.0E8, cumulative cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory} Set#7, type: RecordType(VARCHAR(2147483647) c) rel#382:RelSubset#7.LOGICAL.any.[], best=rel#381 rel#381:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#380,select=c), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} rel#393:RelSubset#7.BATCH_PHYSICAL.any.[], best=rel#392 rel#392:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#391,select=c), rowcount=1.0E8, cumulative cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory} Set#8, type: RecordType(VARCHAR(2147483647) d, INTEGER e) rel#383:RelSubset#8.LOGICAL.any.[], best=rel#377 rel#377:FlinkLogicalTableFunctionScan.LOGICAL.any.[](invocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647) d, INTEGER e)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory} rel#394:RelSubset#8.BATCH_PHYSICAL.any.[], best=null Set#9, type: RecordType(VARCHAR(2147483647) d) rel#385:RelSubset#9.LOGICAL.any.[], best=rel#384 rel#384:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#383,select=d,where=>(e, 20)), rowcount=1.0, cumulative cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory} rel#396:RelSubset#9.BATCH_PHYSICAL.any.[], best=null rel#395:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#394,select=d,where=>(e, 20)), rowcount=1.0, cumulative cost={inf} rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0, cumulative cost={inf} rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast), rowcount=1.0, cumulative cost={inf} rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[], best=null rel#399:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0, cumulative cost={inf} rel#403:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#396,distribution=broadcast), rowcount=1.0, cumulative cost={inf} Set#10, type: RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d) rel#387:RelSubset#10.LOGICAL.any.[], best=rel#386 rel#386:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner), rowcount=1.0E8, cumulative cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network, 0.0 memory} rel#388:RelSubset#10.BATCH_PHYSICAL.any.[], best=null rel#389:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]), rowcount=1.0E8, cumulative cost={inf} rel#400:BatchPhysicalNestedLoopJoin.BATCH_PHYSICAL.any.[](left=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c, d,build=right), rowcount=1.0E8, cumulative cost={inf} Graphviz: digraph G { root [style=filled,label="Root"]; subgraph cluster6{ label="Set 6 RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)"; rel346 [label="rel#346:FlinkLogicalLegacyTableSourceScan\ntable=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel390 [label="rel#390:BatchPhysicalLegacyTableSourceScan\ntable=[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]],fields=a, b, c\nrows=1.0E8, cost={1.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset380 [label="rel#380:RelSubset#6.LOGICAL.any.[]"] subset391 [label="rel#391:RelSubset#6.BATCH_PHYSICAL.any.[]"] } subgraph cluster7{ label="Set 7 RecordType(VARCHAR(2147483647) c)"; rel381 [label="rel#381:FlinkLogicalCalc\ninput=RelSubset#380,select=c\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel392 [label="rel#392:BatchPhysicalCalc\ninput=RelSubset#391,select=c\nrows=1.0E8, cost={2.0E8 rows, 0.0 cpu, 2.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset382 [label="rel#382:RelSubset#7.LOGICAL.any.[]"] subset393 [label="rel#393:RelSubset#7.BATCH_PHYSICAL.any.[]"] } subgraph cluster8{ label="Set 8 RecordType(VARCHAR(2147483647) d, INTEGER e)"; rel377 [label="rel#377:FlinkLogicalTableFunctionScan\ninvocation=*org.apache.flink.table.planner.utils.TableFunc0*($2),rowType=RecordType(VARCHAR(2147483647) d, INTEGER e)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset383 [label="rel#383:RelSubset#8.LOGICAL.any.[]"] subset394 [label="rel#394:RelSubset#8.BATCH_PHYSICAL.any.[]",color=red] } subgraph cluster9{ label="Set 9 RecordType(VARCHAR(2147483647) d)"; rel384 [label="rel#384:FlinkLogicalCalc\ninput=RelSubset#383,select=d,where=>(e, 20)\nrows=1.0, cost={2.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel395 [label="rel#395:BatchPhysicalCalc\ninput=RelSubset#394,select=d,where=>(e, 20)\nrows=1.0, cost={inf}",shape=box] rel399 [label="rel#399:AbstractConverter\ninput=RelSubset#396,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0, cost={inf}",shape=box] rel403 [label="rel#403:BatchPhysicalExchange\ninput=RelSubset#396,distribution=broadcast\nrows=1.0, cost={inf}",shape=box] subset385 [label="rel#385:RelSubset#9.LOGICAL.any.[]"] subset396 [label="rel#396:RelSubset#9.BATCH_PHYSICAL.any.[]"] subset398 [label="rel#398:RelSubset#9.BATCH_PHYSICAL.broadcast.[]"] subset396 -> subset398; } subgraph cluster10{ label="Set 10 RecordType(VARCHAR(2147483647) c, VARCHAR(2147483647) d)"; rel386 [label="rel#386:FlinkLogicalJoin\nleft=RelSubset#382,right=RelSubset#385,condition=true,joinType=inner\nrows=1.0E8, cost={3.00000002E8 rows, 2.00000002E8 cpu, 3.600000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel389 [label="rel#389:AbstractConverter\ninput=RelSubset#387,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8, cost={inf}",shape=box] rel400 [label="rel#400:BatchPhysicalNestedLoopJoin\nleft=RelSubset#393,right=RelSubset#398,joinType=InnerJoin,where=true,select=c, d,build=right\nrows=1.0E8, cost={inf}",shape=box] subset387 [label="rel#387:RelSubset#10.LOGICAL.any.[]"] subset388 [label="rel#388:RelSubset#10.BATCH_PHYSICAL.any.[]"] } root -> subset388; subset380 -> rel346[color=blue]; subset391 -> rel390[color=blue]; subset382 -> rel381[color=blue]; rel381 -> subset380[color=blue]; subset393 -> rel392[color=blue]; rel392 -> subset391[color=blue]; subset383 -> rel377[color=blue]; subset385 -> rel384[color=blue]; rel384 -> subset383[color=blue]; subset396 -> rel395; rel395 -> subset394; subset398 -> rel399; rel399 -> subset396; subset398 -> rel403; rel403 -> subset396; subset387 -> rel386[color=blue]; rel386 -> subset382[color=blue,label="0"]; rel386 -> subset385[color=blue,label="1"]; subset388 -> rel389; rel389 -> subset387; subset388 -> rel400; rel400 -> subset393[label="0"]; rel400 -> subset398[label="1"]; } at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709) at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:533) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) ... 55 more
Attachments
Issue Links
- relates to
-
CALCITE-5381 Add configuration via property to turn on/off check if correlated in RelBuilder
- Closed