Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24563

Comparing timstamp_ltz with random string throws NullPointerException

    XMLWordPrintableJSON

Details

    Description

      Add the following test case to org.apache.flink.table.planner.runtime.batch.sql.CalcITCase to reproduce this issue.

      @Test
      def myTest(): Unit = {
        val data: Seq[Row] = Seq(row(
          LocalDateTime.of(2021, 10, 15, 0, 0, 0).toInstant(ZoneOffset.UTC)))
        val dataId = TestValuesTableFactory.registerData(data)
        val ddl =
          s"""
             |CREATE TABLE MyTable (
             |  ltz TIMESTAMP_LTZ
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$dataId',
             |  'bounded' = 'true'
             |)
             |""".stripMargin
        tEnv.executeSql(ddl)
      
        checkResult("SELECT ltz = uuid() FROM MyTable", Seq(row(null)))
      }
      

      The exception stack is

      java.lang.RuntimeException: Failed to fetch next result
      
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
      	at java.util.Iterator.forEachRemaining(Iterator.java:115)
      	at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106)
      	at org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.myTest(CalcITCase.scala:88)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
      	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
      	at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
      	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
      Caused by: java.io.IOException: Failed to fetch job execution result
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      	... 41 more
      Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
      	... 43 more
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
      	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
      	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
      	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181)
      	... 43 more
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:678)
      	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
      	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
      	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
      	at akka.actor.Actor.aroundReceive(Actor.scala:537)
      	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
      	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
      	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
      	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
      	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
      Caused by: java.lang.NullPointerException
      	at BatchExecCalc$7.processElement(Unknown Source)
      	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
      	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
      	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
      	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:330)
      

      This is because ScalarOperatorGens#generateCast will generate the following code when casting strings to timestamp_ltz.

      result$6 = org.apache.flink.table.data.TimestampData.fromEpochMillis(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.toTimestamp(result$3.toString(), timeZone));
      

      org.apache.flink.table.runtime.functions.SqlDateTimeUtils.toTimestamp might returns null while org.apache.flink.table.data.TimestampData.fromEpochMillis only accepts primitive long values, thus causing this issue.

      What we need to do is to check the result of toTimestamp.

      Attachments

        Issue Links

          Activity

            People

              TsReaper Caizhi Weng
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: