Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12251 Rework the Table API & SQL type system
  3. FLINK-17224

Precision of TIME type does not work correctly

    XMLWordPrintableJSON

Details

    Description

      The support for precision in TIME type does not work correctly causing many different often cryptic problems.

      Precision is completely ignored in FlinkTypeFactory:440-446:

            case TIME =>
              if (relDataType.getPrecision > 3) {
                throw new TableException(
                  s"TIME precision is not supported: ${relDataType.getPrecision}")
              }
              // blink runner support precision 3, but for consistent with flink runner, we set to 0.
              new TimeType()
      

      Example problem:

      @Test
      public void testTimeScalarFunction() throws Exception {
      	int nanoOfDay = 10 * 1_000_000;
      	final List<Row> sourceData = Collections.singletonList(
      		Row.of(LocalTime.ofNanoOfDay(nanoOfDay))
      	);
      
      	final List<Row> sinkData = Arrays.asList(
      		Row.of(nanoOfDay)
      	);
      
      	TestCollectionTableFactory.reset();
      	TestCollectionTableFactory.initData(sourceData);
      
      	tEnv().sqlUpdate("CREATE TABLE SourceTable(s TIME(2)) WITH ('connector' = 'COLLECTION')");
      	tEnv().sqlUpdate("CREATE TABLE SinkTable(s BIGINT) WITH ('connector' = 'COLLECTION')");
      
      	tEnv().from("SourceTable")
      		.select(call(new TimeScalarFunction(), $("s")))
      		.insertInto("SinkTable");
      	tEnv().execute("Test Job");
      
      	assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
      }
      public static class TimeScalarFunction extends ScalarFunction {
      	public Long eval(@DataTypeHint("TIME(1)") LocalTime time) {
      		return time.toNanoOfDay();
      	}
      }
      

      fails with:

      org.apache.flink.table.api.ValidationException: Invalid function call:
      org$apache$flink$table$planner$runtime$stream$table$FunctionITCase$TimeScalarFunction$a19cd231ba10cbbc0b55ebeda49e2a77(TIME(0))
      
      	at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:198)
      	at org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:73)
      	at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:486)
      	at org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:277)
      	at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:576)
      	at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:583)
      	at org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule.convert(FunctionDefinitionConvertRule.java:67)
      	at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
      	at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
      	at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:681)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:128)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$2(QueryOperationConverter.java:487)
      	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
      	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
      	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
      	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
      	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
      	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:488)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:148)
      	at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
      	at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:127)
      	at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
      	at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
      	at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:176)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:188)
      	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
      	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:761)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:753)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:720)
      	at org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.testTimeScalarFunction(FunctionITCase.java:151)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
      	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
      Caused by: org.apache.flink.table.api.ValidationException: Could not infer an output type for the given arguments.
      	at org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType(TypeInferenceUtil.java:146)
      	at org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnTypeOrError(TypeInferenceReturnInference.java:82)
      	at org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:70)
      	... 74 more
      

      The problem is that after an input type inference we apply a cast cast("a" AS TIME(1)) , but when to and from RelDataType this expression is converted to cast("a" AS TIME(0)) and we end up again with wrong type which results in failling MappingTypeStrategy.

      Attachments

        Issue Links

          Activity

            People

              Sergey Nuyanzin Sergey Nuyanzin
              dwysakowicz Dawid Wysakowicz
              Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated: