Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
None
-
None
Description
StreamSQLExample in flink-example will fail if the specified planner is blink planner. Exception is as following:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink do not match. Query schema: [user: BIGINT, product: STRING, amount: INT] Sink schema: [amount: INT, product: STRING, user: BIGINT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260) at org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90) Process finished with exit code 1
That's because blink planner will also validate the sink schema even if it is come from toAppendStream(). However, the TableSinkUtils#inferSinkPhysicalDataType should derive sink schema from query schema when the requested type is POJO [1], because fields order of POJO is not deterministic.