Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24563

Comparing timstamp_ltz with random string throws NullPointerException

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Add the following test case to org.apache.flink.table.planner.runtime.batch.sql.CalcITCase to reproduce this issue.

      @Test
      def myTest(): Unit = {
        val data: Seq[Row] = Seq(row(
          LocalDateTime.of(2021, 10, 15, 0, 0, 0).toInstant(ZoneOffset.UTC)))
        val dataId = TestValuesTableFactory.registerData(data)
        val ddl =
          s"""
             |CREATE TABLE MyTable (
             |  ltz TIMESTAMP_LTZ
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$dataId',
             |  'bounded' = 'true'
             |)
             |""".stripMargin
        tEnv.executeSql(ddl)
      
        checkResult("SELECT ltz = uuid() FROM MyTable", Seq(row(null)))
      }
      

      The exception stack is

      java.lang.RuntimeException: Failed to fetch next result
      
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
      	at java.util.Iterator.forEachRemaining(Iterator.java:115)
      	at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
      	at org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106)
      	at org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.myTest(CalcITCase.scala:88)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
      	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
      	at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
      	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
      Caused by: java.io.IOException: Failed to fetch job execution result
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      	... 41 more
      Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
      	... 43 more
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
      	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
      	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
      	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
      	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181)
      	... 43 more
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:678)
      	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
      	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
      	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
      	at akka.actor.Actor.aroundReceive(Actor.scala:537)
      	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
      	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
      	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
      	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
      	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
      Caused by: java.lang.NullPointerException
      	at BatchExecCalc$7.processElement(Unknown Source)
      	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
      	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
      	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
      	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:330)
      

      This is because ScalarOperatorGens#generateCast will generate the following code when casting strings to timestamp_ltz.

      result$6 = org.apache.flink.table.data.TimestampData.fromEpochMillis(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.toTimestamp(result$3.toString(), timeZone));
      

      org.apache.flink.table.runtime.functions.SqlDateTimeUtils.toTimestamp might returns null while org.apache.flink.table.data.TimestampData.fromEpochMillis only accepts primitive long values, thus causing this issue.

      What we need to do is to check the result of toTimestamp.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            TsReaper Caizhi Weng
            TsReaper Caizhi Weng
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment