org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
FlinkLogicalSink(table=[*anonymous_collect$69*], fields=[b])
+- FlinkLogicalCalc(select=[EXPR$0 AS b])
+- FlinkLogicalJoin(condition=[true], joinType=[left])
:- FlinkLogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[\{ 0 }]])
+- FlinkLogicalValues(type=[RecordType(INTEGER EXPR$0)], tuples=[[\{ 1 }]])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1723)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:860)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1368)
at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:475)
at org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:308)
at org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:144)
at org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:108)
at org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCase.testUncorrelatedScalar(JoinITCase.scala:1061)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> BATCH_PHYSICAL]
There is 1 empty subset: rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows
62538:FlinkLogicalJoin(condition=[true], joinType=[left])
62508:FlinkLogicalValues(subset=[rel#62536:RelSubset#5.LOGICAL.any.[0]], tuples=[[\{ 0 }]])
62510:FlinkLogicalValues(subset=[rel#62537:RelSubset#6.LOGICAL.any.[0]], tuples=[[\{ 1 }]])
Root: rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[]
Original rel:
FlinkLogicalSink(subset=[rel#62506:RelSubset#4.LOGICAL.any.[]], table=[*anonymous_collect$69*], fields=[b]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62517
FlinkLogicalCalc(subset=[rel#62516:RelSubset#3.LOGICAL.any.[]], select=[EXPR$0 AS b]): rowcount = 1.0, cumulative cost = \{1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62520
FlinkLogicalJoin(subset=[rel#62514:RelSubset#2.LOGICAL.any.[]], condition=[true], joinType=[left]): rowcount = 1.0, cumulative cost = \{1.0 rows, 2.0 cpu, 5.0 io, 0.0 network, 0.0 memory}, id = 62513
FlinkLogicalValues(subset=[rel#62509:RelSubset#0.LOGICAL.any.[0]], tuples=[[\{ 0 }]]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62508
FlinkLogicalValues(subset=[rel#62511:RelSubset#1.LOGICAL.any.[0]], tuples=[[\{ 1 }]]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62510
Sets:
Set#5, type: RecordType(INTEGER ZERO)
rel#62536:RelSubset#5.LOGICAL.any.[0], best=rel#62508
rel#62508:FlinkLogicalValues.LOGICAL.any.[0](type=RecordType(INTEGER ZERO),tuples=[\{ 0 }]), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
rel#62548:RelSubset#5.BATCH_PHYSICAL.any.[0], best=rel#62547
rel#62547:BatchPhysicalValues.BATCH_PHYSICAL.any.[0](type=RecordType(INTEGER ZERO),tuples=[\{ 0 }],values=ZERO), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
Set#6, type: RecordType(INTEGER EXPR$0)
rel#62537:RelSubset#6.LOGICAL.any.[0], best=rel#62510
rel#62510:FlinkLogicalValues.LOGICAL.any.[0](type=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }]), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
rel#62550:RelSubset#6.BATCH_PHYSICAL.any.[0], best=rel#62549
rel#62549:BatchPhysicalValues.BATCH_PHYSICAL.any.[0](type=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }],values=EXPR$0), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
Set#7, type: RecordType(INTEGER ZERO, INTEGER EXPR$0)
rel#62539:RelSubset#7.LOGICAL.any.[], best=rel#62538
rel#62538:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#62536,right=RelSubset#62537,condition=true,joinType=left), rowcount=1.0, cumulative cost=\{3.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[], best=null
Set#8, type: RecordType(INTEGER b)
rel#62541:RelSubset#8.LOGICAL.any.[], best=rel#62540
rel#62540:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#62539,select=EXPR$0 AS b), rowcount=1.0, cumulative cost=\{4.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
rel#62553:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
rel#62552:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#62551,select=EXPR$0 AS b), rowcount=1.0, cumulative cost=\{inf}
Set#9, type: RecordType(INTEGER b)
rel#62544:RelSubset#9.LOGICAL.any.[], best=rel#62543
rel#62543:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#62541,table=*anonymous_collect$69*,fields=b), rowcount=1.0, cumulative cost=\{5.0 rows, 5.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
rel#62546:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#62544,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]), rowcount=1.0, cumulative cost=\{inf}
rel#62554:BatchPhysicalSink.BATCH_PHYSICAL.any.[](input=RelSubset#62553,table=*anonymous_collect$69*,fields=b), rowcount=1.0, cumulative cost=\{inf}
Graphviz:
digraph G {
root [style=filled,label="Root"];
subgraph cluster5{
label="Set 5 RecordType(INTEGER ZERO)";
rel62508 [label="rel#62508:FlinkLogicalValues\ntype=RecordType(INTEGER ZERO),tuples=[\{ 0 }]\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel62547 [label="rel#62547:BatchPhysicalValues\ntype=RecordType(INTEGER ZERO),tuples=[\{ 0 }],values=ZERO\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset62536 [label="rel#62536:RelSubset#5.LOGICAL.any.[0]"]
subset62548 [label="rel#62548:RelSubset#5.BATCH_PHYSICAL.any.[0]"]
}
subgraph cluster6{
label="Set 6 RecordType(INTEGER EXPR$0)";
rel62510 [label="rel#62510:FlinkLogicalValues\ntype=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }]\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel62549 [label="rel#62549:BatchPhysicalValues\ntype=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }],values=EXPR$0\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset62537 [label="rel#62537:RelSubset#6.LOGICAL.any.[0]"]
subset62550 [label="rel#62550:RelSubset#6.BATCH_PHYSICAL.any.[0]"]
}
subgraph cluster7{
label="Set 7 RecordType(INTEGER ZERO, INTEGER EXPR$0)";
rel62538 [label="rel#62538:FlinkLogicalJoin\nleft=RelSubset#62536,right=RelSubset#62537,condition=true,joinType=left\nrows=1.0, cost=\{3.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset62539 [label="rel#62539:RelSubset#7.LOGICAL.any.[]"]
subset62551 [label="rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[]",color=red]
}
subgraph cluster8{
label="Set 8 RecordType(INTEGER b)";
rel62540 [label="rel#62540:FlinkLogicalCalc\ninput=RelSubset#62539,select=EXPR$0 AS b\nrows=1.0, cost=\{4.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel62552 [label="rel#62552:BatchPhysicalCalc\ninput=RelSubset#62551,select=EXPR$0 AS b\nrows=1.0, cost=\{inf}",shape=box]
subset62541 [label="rel#62541:RelSubset#8.LOGICAL.any.[]"]
subset62553 [label="rel#62553:RelSubset#8.BATCH_PHYSICAL.any.[]"]
}
subgraph cluster9{
label="Set 9 RecordType(INTEGER b)";
rel62543 [label="rel#62543:FlinkLogicalSink\ninput=RelSubset#62541,table=*anonymous_collect$69*,fields=b\nrows=1.0, cost=\{5.0 rows, 5.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel62546 [label="rel#62546:AbstractConverter\ninput=RelSubset#62544,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0, cost=\{inf}",shape=box]
rel62554 [label="rel#62554:BatchPhysicalSink\ninput=RelSubset#62553,table=*anonymous_collect$69*,fields=b\nrows=1.0, cost=\{inf}",shape=box]
subset62544 [label="rel#62544:RelSubset#9.LOGICAL.any.[]"]
subset62545 [label="rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[]"]
}
root -> subset62545;
subset62536 -> rel62508[color=blue];
subset62548 -> rel62547[color=blue];
subset62537 -> rel62510[color=blue];
subset62550 -> rel62549[color=blue];
subset62539 -> rel62538[color=blue]; rel62538 -> subset62536[color=blue,label="0"]; rel62538 -> subset62537[color=blue,label="1"];
subset62541 -> rel62540[color=blue]; rel62540 -> subset62539[color=blue];
subset62553 -> rel62552; rel62552 -> subset62551;
subset62544 -> rel62543[color=blue]; rel62543 -> subset62541[color=blue];
subset62545 -> rel62546; rel62546 -> subset62544;
subset62545 -> rel62554; rel62554 -> subset62553;
}
at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:539)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:316)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
... 72 more