Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.14.3
-
None
-
None
Description
I got an CannotPlanException when change the isDeterministic option to false. For detail see this code:
//代码占位符 public class GetDayTimeEtlSwitch extends TableFunction<Integer> { private boolean status = false; @Override public boolean isDeterministic() { return false; } public void eval() { if (status) { collect(1); } else { if (System.currentTimeMillis() > 1646298908000L) { status = true; collect(1); } else { collect(0); } } } }
Exception stack...
//代码占位符 Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]) +- FlinkLogicalJoin(condition=[true], joinType=[left]) :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]) +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742) at TestSwitch.main(TestSwitch.java:33) Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]. Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single] There is 1 empty subset: rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows 168:FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])Root: rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE] Original rel: FlinkLogicalSink(subset=[rel#140:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]], table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 151 FlinkLogicalJoin(subset=[rel#150:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[true], joinType=[left]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 4.800000001E9 io, 0.0 network, 0.0 memory}, id = 149 FlinkLogicalCalc(subset=[rel#148:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], select=[STUNAME, SUBJECT, SCORE, PROCTIME() AS PROC_TIME]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 153 FlinkLogicalTableSourceScan(subset=[rel#143:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 142 FlinkLogicalTableFunctionScan(subset=[rel#146:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]], invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 145Sets: Set#5, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE) rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#142 rel#142:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#183 rel#183:StreamPhysicalTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} Set#6, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME) rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#173 rel#173:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#185 rel#185:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf} rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory} rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=rel#194 rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf} rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory} Set#7, type: RecordType(INTEGER EXPR$0) rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#168 rel#168:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](invocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory} rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=null Set#8, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0) rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#176 rel#176:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#174,right=RelSubset#175,condition=true,joinType=left), rowcount=1.0E8, cumulative cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory} rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null rel#190:StreamPhysicalJoin.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](left=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey), rowcount=1.0E8, cumulative cost={inf} Set#9, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0) rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#178 rel#178:FlinkLogicalSink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory} rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null rel#182:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf} rel#192:StreamPhysicalSink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={inf}Graphviz: digraph G { root [style=filled,label="Root"]; subgraph cluster5{ label="Set 5 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)"; rel142 [label="rel#142:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel183 [label="rel#183:StreamPhysicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset172 [label="rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE]"] subset184 [label="rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } subgraph cluster6{ label="Set 6 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)"; rel173 [label="rel#173:FlinkLogicalCalc\ninput=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel185 [label="rel#185:StreamPhysicalCalc\ninput=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel188 [label="rel#188:AbstractConverter\ninput=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box] rel194 [label="rel#194:StreamPhysicalExchange\ninput=RelSubset#186,distribution=single\nrows=1.0E8, cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}",color=blue,shape=box] subset174 [label="rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE]"] subset186 [label="rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] subset187 [label="rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]"] subset186 -> subset187; } subgraph cluster7{ label="Set 7 RecordType(INTEGER EXPR$0)"; rel168 [label="rel#168:FlinkLogicalTableFunctionScan\ninvocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset175 [label="rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE]"] subset189 [label="rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]",color=red] } subgraph cluster8{ label="Set 8 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)"; rel176 [label="rel#176:FlinkLogicalJoin\nleft=RelSubset#174,right=RelSubset#175,condition=true,joinType=left\nrows=1.0E8, cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel190 [label="rel#190:StreamPhysicalJoin\nleft=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey\nrows=1.0E8, cost={inf}",shape=box] subset177 [label="rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE]"] subset191 [label="rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } subgraph cluster9{ label="Set 9 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)"; rel178 [label="rel#178:FlinkLogicalSink\ninput=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel182 [label="rel#182:AbstractConverter\ninput=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box] rel192 [label="rel#192:StreamPhysicalSink\ninput=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={inf}",shape=box] subset179 [label="rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE]"] subset181 [label="rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } root -> subset181; subset172 -> rel142[color=blue]; subset184 -> rel183[color=blue]; subset174 -> rel173[color=blue]; rel173 -> subset172[color=blue]; subset186 -> rel185[color=blue]; rel185 -> subset184[color=blue]; subset187 -> rel188; rel188 -> subset186; subset187 -> rel194[color=blue]; rel194 -> subset186[color=blue]; subset175 -> rel168[color=blue]; subset177 -> rel176[color=blue]; rel176 -> subset174[color=blue,label="0"]; rel176 -> subset175[color=blue,label="1"]; subset191 -> rel190; rel190 -> subset187[label="0"]; rel190 -> subset189[label="1"]; subset179 -> rel178[color=blue]; rel178 -> subset177[color=blue]; subset181 -> rel182; rel182 -> subset179; subset181 -> rel192; rel192 -> subset191; } at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742) at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69) ... 23 more