Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14042

Different RelDataTypes generated for same TemporalTableFunction

    XMLWordPrintableJSON

Details

    Description

      Given the following table called "foo":

      SELECT event_time, b, c
      FROM X
      WHERE event_time >= <START_TIME> AND event_time <END_TIME>

      And the following temporal table definition defined on "foo":

      SELECT event_time, b, COLLECT(c) c
      FROM foo
      GROUP BY event_time, b

      I get the following exception:

      Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set:Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set:set type is RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" MULTISET c) NOT NULLexpression type is RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" MULTISET NOT NULL c) NOT NULL
      set is rel#17:LogicalCorrelate.NONE(left=HepRelVertex#15,right=HepRelVertex#16,correlation=$cor0,joinType=inner,requiredColumns={0})expression is LogicalTemporalTableJoin#23 at org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:380) at org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:57) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) at org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:111) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:280) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198) at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360) at org.apache.flink.table.api.TableEnvironment.runHepPlannerSimultaneously(TableEnvironment.scala:344) at org.apache.flink.table.api.TableEnvironment.optimizeExpandPlan(TableEnvironment.scala:270) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:809) at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351) at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879) at org.apache.flink.table.api.Table.insertInto(table.scala:1148)
      

      Digging into the table planner, it appears that when the temporal table is being registered, it goes through `FlinkTypeFactory.buildLogicalRowType`, which uses the following code:

      def buildLogicalRowType(
          fieldNames: Seq[String],
          fieldTypes: Seq[TypeInformation[_]])
        : RelDataType = {
        val logicalRowTypeBuilder = builder
      
        val fields = fieldNames.zip(fieldTypes)
        fields.foreach(f => {
          // time indicators are not nullable
          val nullable = !FlinkTypeFactory.isTimeIndicatorType(f._2)
          logicalRowTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2, nullable))
        })
      
        logicalRowTypeBuilder.build
      }
      

       We can see here that `nullable` is derived from `isTimeIndicatorType` method.

      On the other hand, when registering the table that uses the TemporalTableFunction in the query, this resolves through `FlinkTableFunctionImpl.getRowType`, which doesn't look up the time indicator flags at all and sets all fields to be nullable:

      override def getRowType(typeFactory: RelDataTypeFactory,
                              arguments: util.List[AnyRef]): RelDataType = {
        val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
        val builder = flinkTypeFactory.builder
        fieldNames
          .zip(fieldTypes)
          .foreach { f =>
            builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, isNullable = true))
          }
        builder.build
      }
      

      This creates a diff between the original schema registered and the inferred schema for usage, which results in the above exception.

      I haven't tried this for other complex types, but it seems like this should happen for any advanced type which wasn't nullable to begin with.

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Yuval.Itzchakov Yuval Itzchakov
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m