Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16108

StreamSQLExample is failed if running in blink planner

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.10.1, 1.11.0
    • Component/s: Table SQL / Planner
    • Labels:
      None

      Description

      StreamSQLExample in flink-example will fail if the specified planner is blink planner. Exception is as following:

      Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
      Query schema: [user: BIGINT, product: STRING, amount: INT]
      Sink schema: [amount: INT, product: STRING, user: BIGINT]
      	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
      	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
      	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
      	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361)
      	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269)
      	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260)
      	at org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90)
      
      Process finished with exit code 1
      

      That's because blink planner will also validate the sink schema even if it is come from toAppendStream(). However, the TableSinkUtils#inferSinkPhysicalDataType should derive sink schema from query schema when the requested type is POJO [1], because fields order of POJO is not deterministic.

      [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237

        Attachments

          Activity

            People

            • Assignee:
              jark Jark Wu
              Reporter:
              jark Jark Wu
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: