Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16108

StreamSQLExample is failed if running in blink planner

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • None
    • 1.10.1, 1.11.0
    • Table SQL / Planner
    • None

    Description

      StreamSQLExample in flink-example will fail if the specified planner is blink planner. Exception is as following:

      Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
      Query schema: [user: BIGINT, product: STRING, amount: INT]
      Sink schema: [amount: INT, product: STRING, user: BIGINT]
      	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
      	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
      	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
      	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361)
      	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269)
      	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260)
      	at org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90)
      
      Process finished with exit code 1
      

      That's because blink planner will also validate the sink schema even if it is come from toAppendStream(). However, the TableSinkUtils#inferSinkPhysicalDataType should derive sink schema from query schema when the requested type is POJO [1], because fields order of POJO is not deterministic.

      [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            jark Jark Wu
            jark Jark Wu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment