Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20873 Upgrade Calcite version to 1.27
  3. FLINK-29204

JoinITCase#testUncorrelatedScalar fails with Cannot generate a valid execution plan for the given query after calcite update 1.27

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • None
    • None
    • None

    Description

      org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 
      
      FlinkLogicalSink(table=[*anonymous_collect$69*], fields=[b])
      +- FlinkLogicalCalc(select=[EXPR$0 AS b])
         +- FlinkLogicalJoin(condition=[true], joinType=[left])
            :- FlinkLogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[\{ 0 }]])
            +- FlinkLogicalValues(type=[RecordType(INTEGER EXPR$0)], tuples=[[\{ 1 }]])
      
      This exception indicates that the query uses an unsupported SQL feature.
      Please check the documentation for the set of currently supported SQL features.
      
          at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
          at scala.collection.Iterator.foreach(Iterator.scala:937)
          at scala.collection.Iterator.foreach$(Iterator.scala:937)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
          at scala.collection.IterableLike.foreach(IterableLike.scala:70)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
          at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
          at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
          at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
          at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
          at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
          at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
          at scala.collection.immutable.List.foreach(List.scala:388)
          at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
          at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
          at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
          at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1723)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:860)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1368)
          at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:475)
          at org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:308)
          at org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:144)
          at org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:108)
          at org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCase.testUncorrelatedScalar(JoinITCase.scala:1061)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
          at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
          at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
          at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
          at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
          at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
          at org.junit.runners.Suite.runChild(Suite.java:128)
          at org.junit.runners.Suite.runChild(Suite.java:27)
          at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
          at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
          at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
          at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
          at org.junit.rules.RunRules.evaluate(RunRules.java:20)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
          at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
          at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
          at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
      Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
      Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> BATCH_PHYSICAL]
      There is 1 empty subset: rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows
      62538:FlinkLogicalJoin(condition=[true], joinType=[left])
        62508:FlinkLogicalValues(subset=[rel#62536:RelSubset#5.LOGICAL.any.[0]], tuples=[[\{ 0 }]])
        62510:FlinkLogicalValues(subset=[rel#62537:RelSubset#6.LOGICAL.any.[0]], tuples=[[\{ 1 }]])
      
      Root: rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[]
      Original rel:
      FlinkLogicalSink(subset=[rel#62506:RelSubset#4.LOGICAL.any.[]], table=[*anonymous_collect$69*], fields=[b]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62517
        FlinkLogicalCalc(subset=[rel#62516:RelSubset#3.LOGICAL.any.[]], select=[EXPR$0 AS b]): rowcount = 1.0, cumulative cost = \{1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62520
          FlinkLogicalJoin(subset=[rel#62514:RelSubset#2.LOGICAL.any.[]], condition=[true], joinType=[left]): rowcount = 1.0, cumulative cost = \{1.0 rows, 2.0 cpu, 5.0 io, 0.0 network, 0.0 memory}, id = 62513
            FlinkLogicalValues(subset=[rel#62509:RelSubset#0.LOGICAL.any.[0]], tuples=[[\{ 0 }]]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62508
            FlinkLogicalValues(subset=[rel#62511:RelSubset#1.LOGICAL.any.[0]], tuples=[[\{ 1 }]]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62510
      
      Sets:
      Set#5, type: RecordType(INTEGER ZERO)
          rel#62536:RelSubset#5.LOGICAL.any.[0], best=rel#62508
              rel#62508:FlinkLogicalValues.LOGICAL.any.[0](type=RecordType(INTEGER ZERO),tuples=[\{ 0 }]), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
          rel#62548:RelSubset#5.BATCH_PHYSICAL.any.[0], best=rel#62547
              rel#62547:BatchPhysicalValues.BATCH_PHYSICAL.any.[0](type=RecordType(INTEGER ZERO),tuples=[\{ 0 }],values=ZERO), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
      Set#6, type: RecordType(INTEGER EXPR$0)
          rel#62537:RelSubset#6.LOGICAL.any.[0], best=rel#62510
              rel#62510:FlinkLogicalValues.LOGICAL.any.[0](type=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }]), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
          rel#62550:RelSubset#6.BATCH_PHYSICAL.any.[0], best=rel#62549
              rel#62549:BatchPhysicalValues.BATCH_PHYSICAL.any.[0](type=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }],values=EXPR$0), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
      Set#7, type: RecordType(INTEGER ZERO, INTEGER EXPR$0)
          rel#62539:RelSubset#7.LOGICAL.any.[], best=rel#62538
              rel#62538:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#62536,right=RelSubset#62537,condition=true,joinType=left), rowcount=1.0, cumulative cost=\{3.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
          rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[], best=null
      Set#8, type: RecordType(INTEGER b)
          rel#62541:RelSubset#8.LOGICAL.any.[], best=rel#62540
              rel#62540:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#62539,select=EXPR$0 AS b), rowcount=1.0, cumulative cost=\{4.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
          rel#62553:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
              rel#62552:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#62551,select=EXPR$0 AS b), rowcount=1.0, cumulative cost=\{inf}
      Set#9, type: RecordType(INTEGER b)
          rel#62544:RelSubset#9.LOGICAL.any.[], best=rel#62543
              rel#62543:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#62541,table=*anonymous_collect$69*,fields=b), rowcount=1.0, cumulative cost=\{5.0 rows, 5.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
          rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
              rel#62546:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#62544,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]), rowcount=1.0, cumulative cost=\{inf}
              rel#62554:BatchPhysicalSink.BATCH_PHYSICAL.any.[](input=RelSubset#62553,table=*anonymous_collect$69*,fields=b), rowcount=1.0, cumulative cost=\{inf}
      
      Graphviz:
      digraph G {
          root [style=filled,label="Root"];
          subgraph cluster5{
              label="Set 5 RecordType(INTEGER ZERO)";
              rel62508 [label="rel#62508:FlinkLogicalValues\ntype=RecordType(INTEGER ZERO),tuples=[\{ 0 }]\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel62547 [label="rel#62547:BatchPhysicalValues\ntype=RecordType(INTEGER ZERO),tuples=[\{ 0 }],values=ZERO\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              subset62536 [label="rel#62536:RelSubset#5.LOGICAL.any.[0]"]
              subset62548 [label="rel#62548:RelSubset#5.BATCH_PHYSICAL.any.[0]"]
          }
          subgraph cluster6{
              label="Set 6 RecordType(INTEGER EXPR$0)";
              rel62510 [label="rel#62510:FlinkLogicalValues\ntype=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }]\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel62549 [label="rel#62549:BatchPhysicalValues\ntype=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }],values=EXPR$0\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              subset62537 [label="rel#62537:RelSubset#6.LOGICAL.any.[0]"]
              subset62550 [label="rel#62550:RelSubset#6.BATCH_PHYSICAL.any.[0]"]
          }
          subgraph cluster7{
              label="Set 7 RecordType(INTEGER ZERO, INTEGER EXPR$0)";
              rel62538 [label="rel#62538:FlinkLogicalJoin\nleft=RelSubset#62536,right=RelSubset#62537,condition=true,joinType=left\nrows=1.0, cost=\{3.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              subset62539 [label="rel#62539:RelSubset#7.LOGICAL.any.[]"]
              subset62551 [label="rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[]",color=red]
          }
          subgraph cluster8{
              label="Set 8 RecordType(INTEGER b)";
              rel62540 [label="rel#62540:FlinkLogicalCalc\ninput=RelSubset#62539,select=EXPR$0 AS b\nrows=1.0, cost=\{4.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel62552 [label="rel#62552:BatchPhysicalCalc\ninput=RelSubset#62551,select=EXPR$0 AS b\nrows=1.0, cost=\{inf}",shape=box]
              subset62541 [label="rel#62541:RelSubset#8.LOGICAL.any.[]"]
              subset62553 [label="rel#62553:RelSubset#8.BATCH_PHYSICAL.any.[]"]
          }
          subgraph cluster9{
              label="Set 9 RecordType(INTEGER b)";
              rel62543 [label="rel#62543:FlinkLogicalSink\ninput=RelSubset#62541,table=*anonymous_collect$69*,fields=b\nrows=1.0, cost=\{5.0 rows, 5.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
              rel62546 [label="rel#62546:AbstractConverter\ninput=RelSubset#62544,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0, cost=\{inf}",shape=box]
              rel62554 [label="rel#62554:BatchPhysicalSink\ninput=RelSubset#62553,table=*anonymous_collect$69*,fields=b\nrows=1.0, cost=\{inf}",shape=box]
              subset62544 [label="rel#62544:RelSubset#9.LOGICAL.any.[]"]
              subset62545 [label="rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[]"]
          }
          root -> subset62545;
          subset62536 -> rel62508[color=blue];
          subset62548 -> rel62547[color=blue];
          subset62537 -> rel62510[color=blue];
          subset62550 -> rel62549[color=blue];
          subset62539 -> rel62538[color=blue]; rel62538 -> subset62536[color=blue,label="0"]; rel62538 -> subset62537[color=blue,label="1"];
          subset62541 -> rel62540[color=blue]; rel62540 -> subset62539[color=blue];
          subset62553 -> rel62552; rel62552 -> subset62551;
          subset62544 -> rel62543[color=blue]; rel62543 -> subset62541[color=blue];
          subset62545 -> rel62546; rel62546 -> subset62544;
          subset62545 -> rel62554; rel62554 -> subset62553;
      }
          at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
          at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:539)
          at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:316)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
          ... 72 more
      
       
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            Sergey Nuyanzin Sergey Nuyanzin
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: