Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26461

Throw CannotPlanException in TableFunction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.14.3
    • None
    • Table SQL / API
    • None

    Description

      I got an CannotPlanException when change the isDeterministic option to false. For detail see this code:

      //代码占位符
      public class GetDayTimeEtlSwitch extends TableFunction<Integer> {
          private boolean status = false;
      
      
          @Override
          public boolean isDeterministic() {
              return false;
          }
      
          public void eval() {
              if (status) {
                  collect(1);
              } else {
                  if (System.currentTimeMillis() > 1646298908000L) {
                      status = true;
                      collect(1);
                  } else {
                      collect(0);
                  }
      
              }
      
          }
      } 

      Exception stack...

      //代码占位符
      Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0])
      +- FlinkLogicalJoin(condition=[true], joinType=[left])
         :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME])
         :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE])
         +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query uses an unsupported SQL feature.
      Please check the documentation for the set of currently supported SQL features.
          at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
          at scala.collection.Iterator.foreach(Iterator.scala:937)
          at scala.collection.Iterator.foreach$(Iterator.scala:937)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
          at scala.collection.IterableLike.foreach(IterableLike.scala:70)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
          at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
          at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
          at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
          at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
          at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
          at TestSwitch.main(TestSwitch.java:33)
      Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
      Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single]
      There is 1 empty subset: rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows
      168:FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])Root: rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
      Original rel:
      FlinkLogicalSink(subset=[rel#140:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]], table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 151
        FlinkLogicalJoin(subset=[rel#150:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[true], joinType=[left]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 4.800000001E9 io, 0.0 network, 0.0 memory}, id = 149
          FlinkLogicalCalc(subset=[rel#148:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], select=[STUNAME, SUBJECT, SCORE, PROCTIME() AS PROC_TIME]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 153
            FlinkLogicalTableSourceScan(subset=[rel#143:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 142
          FlinkLogicalTableFunctionScan(subset=[rel#146:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]], invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 145Sets:
      Set#5, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)
          rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#142
              rel#142:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
          rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#183
              rel#183:StreamPhysicalTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
      Set#6, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)
          rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#173
              rel#173:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
          rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#185
              rel#185:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
              rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
              rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}
          rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=rel#194
              rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
              rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}
      Set#7, type: RecordType(INTEGER EXPR$0)
          rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#168
              rel#168:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](invocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
          rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=null
      Set#8, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)
          rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#176
              rel#176:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#174,right=RelSubset#175,condition=true,joinType=left), rowcount=1.0E8, cumulative cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}
          rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
              rel#190:StreamPhysicalJoin.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](left=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey), rowcount=1.0E8, cumulative cost={inf}
      Set#9, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)
          rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#178
              rel#178:FlinkLogicalSink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}
          rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
              rel#182:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
              rel#192:StreamPhysicalSink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={inf}Graphviz:
      digraph G {
          root [style=filled,label="Root"];
          subgraph cluster5{
              label="Set 5 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)";
              rel142 [label="rel#142:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel183 [label="rel#183:StreamPhysicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              subset172 [label="rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE]"]
              subset184 [label="rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
          }
          subgraph cluster6{
              label="Set 6 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)";
              rel173 [label="rel#173:FlinkLogicalCalc\ninput=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel185 [label="rel#185:StreamPhysicalCalc\ninput=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel188 [label="rel#188:AbstractConverter\ninput=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
              rel194 [label="rel#194:StreamPhysicalExchange\ninput=RelSubset#186,distribution=single\nrows=1.0E8, cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}",color=blue,shape=box]
              subset174 [label="rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE]"]
              subset186 [label="rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
              subset187 [label="rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]"]
              subset186 -> subset187;    }
          subgraph cluster7{
              label="Set 7 RecordType(INTEGER EXPR$0)";
              rel168 [label="rel#168:FlinkLogicalTableFunctionScan\ninvocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              subset175 [label="rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE]"]
              subset189 [label="rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]",color=red]
          }
          subgraph cluster8{
              label="Set 8 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)";
              rel176 [label="rel#176:FlinkLogicalJoin\nleft=RelSubset#174,right=RelSubset#175,condition=true,joinType=left\nrows=1.0E8, cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel190 [label="rel#190:StreamPhysicalJoin\nleft=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey\nrows=1.0E8, cost={inf}",shape=box]
              subset177 [label="rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE]"]
              subset191 [label="rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
          }
          subgraph cluster9{
              label="Set 9 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)";
              rel178 [label="rel#178:FlinkLogicalSink\ninput=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel182 [label="rel#182:AbstractConverter\ninput=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
              rel192 [label="rel#192:StreamPhysicalSink\ninput=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={inf}",shape=box]
              subset179 [label="rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE]"]
              subset181 [label="rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
          }
          root -> subset181;
          subset172 -> rel142[color=blue];
          subset184 -> rel183[color=blue];
          subset174 -> rel173[color=blue]; rel173 -> subset172[color=blue];
          subset186 -> rel185[color=blue]; rel185 -> subset184[color=blue];
          subset187 -> rel188; rel188 -> subset186;
          subset187 -> rel194[color=blue]; rel194 -> subset186[color=blue];
          subset175 -> rel168[color=blue];
          subset177 -> rel176[color=blue]; rel176 -> subset174[color=blue,label="0"]; rel176 -> subset175[color=blue,label="1"];
          subset191 -> rel190; rel190 -> subset187[label="0"]; rel190 -> subset189[label="1"];
          subset179 -> rel178[color=blue]; rel178 -> subset177[color=blue];
          subset181 -> rel182; rel182 -> subset179;
          subset181 -> rel192; rel192 -> subset191;
      }
          at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742)
          at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520)
          at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
          ... 23 more
       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            SpongebobZ Spongebob
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: