Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15168

Exception is thrown when using kafka source connector with flink planner

    XMLWordPrintableJSON

Details

    Description

      when running the following case using kafka as source connector in flink planner, we will get a RuntimeException:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.connect(new Kafka()
              .version("0.11")
              .topic("user")
              .startFromEarliest()
              .property("zookeeper.connect", "localhost:2181")
              .property("bootstrap.servers", "localhost:9092"))
              .withFormat(new Json()
                      .failOnMissingField(true)
                      .jsonSchema("{" +
                              "  type: 'object'," +
                              "  properties: {" +
                              "    a: {" +
                              "      type: 'string'" +
                              "    }," +
                              "    b: {" +
                              "      type: 'string'" +
                              "    }," +
                              "    c: {" +
                              "      type: 'string'" +
                              "    }," +
                              "    time: {" +
                              "      type: 'string'," +
                              "      format: 'date-time'" +
                              "    }" +
                              "  }" +
                              "}"
                      ))
              .withSchema(new Schema()
                      .field("rowtime", Types.SQL_TIMESTAMP)
                      .rowtime(new Rowtime()
                              .timestampsFromField("time")
                              .watermarksPeriodicBounded(60000))
                      .field("a", Types.STRING)
                      .field("b", Types.STRING)
                      .field("c", Types.STRING))
              .inAppendMode()
              .registerTableSource("source");Table t = tEnv.scan("source").select("a");tEnv.toAppendStream(t, Row.class).print();
      tEnv.execute("test");
      

      The RuntimeException detail:

      Exception in thread "main" java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#26:FlinkLogicalCalc.LOGICAL(input=RelSubset#25,expr#0..3={inputs},a=$t1), Scan(table:[default_catalog, default_database, source], fields:(rowtime, a, b, c), source:Kafka011TableSource(rowtime, a, b, c))]Exception in thread "main" java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#26:FlinkLogicalCalc.LOGICAL(input=RelSubset#25,expr#0..3={inputs},a=$t1), Scan(table:[default_catalog, default_database, source], fields:(rowtime, a, b, c), source:Kafka011TableSource(rowtime, a, b, c))] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:280) at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:199) at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:389) at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:180) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) at org.apache.flink.table.api.example.batch.JavaBatchWordCount.main(JavaBatchWordCount.java:64)Caused by: org.apache.flink.table.api.ValidationException: Rowtime field 'rowtime' has invalid type LocalDateTime. Rowtime attributes must be of type Timestamp. at org.apache.flink.table.sources.TableSourceUtil$$anonfun$3.apply(TableSourceUtil.scala:114) at org.apache.flink.table.sources.TableSourceUtil$$anonfun$3.apply(TableSourceUtil.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) at org.apache.flink.table.sources.TableSourceUtil$.getPhysicalIndexes(TableSourceUtil.scala:307) at org.apache.flink.table.plan.rules.logical.PushProjectIntoTableSourceScanRule.onMatch(PushProjectIntoTableSourceScanRule.scala:46) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208) ... 22 more
      

      Attachments

        Issue Links

          Activity

            People

              dwysakowicz Dawid Wysakowicz
              hxbks2ks Huang Xingbo
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h