Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21565

Support more integer types in TIMESTAMPADD

    XMLWordPrintableJSON

Details

    Description

      At the moment, TIMESTAMPADD does not seem to support SMALLINT or TINYINT types which should be perfectly suitable for auto-conversion (in contrast to BIGINT or floating numbers where I would expect the user to cast it appropriately).

      With the attached file, executing these lines

      CREATE TABLE `flights` (
        `_YEAR` CHAR(4),
        `_MONTH` CHAR(2),
        `_DAY` CHAR(2),
        `_SCHEDULED_DEPARTURE` CHAR(4),
        `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
        `_DEPARTURE_TIME` CHAR(4),
        `DEPARTURE_DELAY` TINYINT,
        `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, `DEPARTURE_DELAY`, TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'))
      ) WITH (
        'connector' = 'filesystem',
        'path' = 'file:///tmp/kaggle-flight-delay/flights-21565.csv',
        'format' = 'csv'
      );
      
      SELECT * FROM flights;
      

      currently fail with the following exception (similarly for SMALLINT):

      org.apache.flink.table.planner.codegen.CodeGenException: Unsupported casting from TINYINT to INTERVAL SECOND(3).
      	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.numericCasting(ScalarOperatorGens.scala:2352) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateBinaryArithmeticOperator(ScalarOperatorGens.scala:93) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:590) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:529) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:143) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:169) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.Iterator.foreach$(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.IterableLike.foreach(IterableLike.scala:70) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.IterableLike.foreach$(IterableLike.scala:69) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) ~[flink-table_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321) ~[flink-table_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:328) ~[flink-table_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:287) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:282) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
      	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:542) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
      	... 8 more
      

      Current workaround is to manually cast to INT:

      CREATE TABLE `flights` (
        `_YEAR` CHAR(4),
        `_MONTH` CHAR(2),
        `_DAY` CHAR(2),
        `_SCHEDULED_DEPARTURE` CHAR(4),
        `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
        `_DEPARTURE_TIME` CHAR(4),
        `DEPARTURE_DELAY` SMALLINT,
        `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'))
      ) WITH (
        'connector' = 'filesystem',
        'path' = 'file:///tmp/kaggle-flight-delay/flights-21563.csv',
        'format' = 'csv'
      );
      

      Attachments

        1. flights-21565.csv
          0.2 kB
          Nico Kruber

        Issue Links

          Activity

            People

              shenzhu0127 Shen Zhu
              nkruber Nico Kruber
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: