Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15840

ClassCastException is thrown when use tEnv.from for temp/catalog table under Blink planner

    XMLWordPrintableJSON

Details

    Description

      ClassCastException is thrown when use ConnectorDescriptor under Blink planner.
      The exception can be reproduced by the following test:

      @Test
      def testDescriptor(): Unit = {
       this.env = StreamExecutionEnvironment.getExecutionEnvironment
       val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
       this.tEnv = StreamTableEnvironment.create(env, setting)
      
       tEnv.connect(new FileSystem().path("/tmp/input"))
       .withFormat(new OldCsv().field("word", DataTypes.STRING()))
       .withSchema(new Schema().field("word", DataTypes.STRING()))
       .createTemporaryTable("sourceTable")
      
       val sink = new TestingAppendSink
       tEnv.from("sourceTable").toAppendStream[Row].addSink(sink)
       env.execute()
      }
      

      Exceptions:

      java.lang.ClassCastException: org.apache.calcite.plan.ViewExpanders$2 cannot be cast to org.apache.flink.table.planner.calcite.FlinkToRelContext
      
       at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:89)
       at org.apache.calcite.rel.rules.TableScanRule.onMatch(TableScanRule.java:55)
       at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
       at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
       at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
       at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
       at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
       at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
       at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
       at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
       at scala.collection.immutable.Range.foreach(Range.scala:160)
       at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
       at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
       at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
       at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
       at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
       at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
       at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
       at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
       at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
       at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
       at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
       at org.apache.flink.table.planner.runtime.stream.table.CalcITCase.testDescriptor(CalcITCase.scala:541)
      

      It seems we should not cast `context` to `FlinkToRelContext` directly as it could also be an anonymous classes in `org.apache.calcite.plan.ViewExpanders`.

      What do you think?

      Attachments

        Issue Links

          Activity

            People

              lzljs3620320 Jingsong Lee
              sunjincheng121 sunjincheng
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m