Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22861

TIMESTAMPADD + timestamp_ltz type throws CodeGenException

    XMLWordPrintableJSON

Details

    Description

      Add the following test case to org.apache.flink.table.planner.runtime.batch.sql.CalcITCase to reproduce this issue.

      @Test
      def myTest(): Unit = {
        checkResult("SELECT TIMESTAMPADD(MINUTE, 10, CURRENT_TIMESTAMP)", Seq())
      }
      

      The exception stack is

      org.apache.flink.table.planner.codegen.CodeGenException: Incompatible types of expression and result type. Expression[GeneratedExpression(result$5,isNull$4,
      
      
      isNull$4 = false || false;
      result$5 = null;
      if (!isNull$4) {
        
        result$5 = org.apache.flink.table.data.TimestampData.fromEpochMillis(((org.apache.flink.table.data.TimestampData) queryStartTimestamp).getMillisecond() + ((long) 600000L), ((org.apache.flink.table.data.TimestampData) queryStartTimestamp).getNanoOfMillisecond());
        
      }
      ,TIMESTAMP_LTZ(3) NOT NULL,None)] type is [TIMESTAMP_LTZ(3) NOT NULL], result type is [TIMESTAMP(6) NOT NULL]
      
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateResultExpression$1.apply(ExprCodeGenerator.scala:311)
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateResultExpression$1.apply(ExprCodeGenerator.scala:299)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateResultExpression(ExprCodeGenerator.scala:299)
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateResultExpression(ExprCodeGenerator.scala:255)
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:142)
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:167)
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:50)
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
      	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
      	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
      	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
      	at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
      	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
      	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:80)
      	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:79)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:79)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1657)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:797)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1258)
      	at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106)
      	at org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.myTest2(CalcITCase.scala:81)
      

      This is because org.apache.calcite.sql.fun.SqlTimestampAddFunction#deduceType always returns a timestamp type for some time unit. So it seems that we should write our own timestamp add function to get the correct return type.

      Attachments

        Issue Links

          Activity

            People

              TsReaper Caizhi Weng
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: