Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13301

Some PlannerExpression resultType is not consistent with Calcite Type inference

    XMLWordPrintableJSON

Details

    Description

      Some PlannerExpression resultType is not consistent with Calcite Type inference. The problem could be happened when run the following example:

              // prepare source Data
          val testData = new mutable.MutableList[(Int)]
          testData.+=((3))
          val t = env.fromCollection(testData).toTable(tEnv).as('a)
      
          // register a TableSink
          val fieldNames = Array("f0")
          val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT())
      //    val fieldTypes: Array[TypeInformation[_]] = Array(Types.LONG())
          val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
          tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
          
          t.select('a.floor()).insertInto("targetTable")
      
          env.execute()
      

      The cause is ResultType of `floor` is LONG_TYPE_INFO, while in Calcite `SqlFloorFunction` infers returnType is the type of the first argument(INT in the above case).
      If I change `fieldTypes` to Array(Types.INT()), the following error will be thrown in compile phase.

      org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [targetTable] do not match.
      Query result schema: [_c0: Long]
      TableSink schema:    [f0: Integer]
      
      	at org.apache.flink.table.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:59)
      	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:158)
      	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:157)
      	at scala.Option.map(Option.scala:146)
      	at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:157)
      	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
      	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      

      And If I change `fieldTypes` to Array(Types.LONG()), the other error will be thrown in runtime.

      org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: Long; Actual: Integer
      
      	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:103)
      	at org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:98)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      	at org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
      	at org.apache.flink.table.planner.DataStreamConversions$.getConversionMapper(DataStreamConversions.scala:135)
      	at org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:91)
      

      Above inconsistent problem also exists in `Floor`, `Ceil`, `Mod` and so on.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jingzhang Jing Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: