Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30245

NPE thrown when filtering decimal(18, 4) values after calling DecimalDataUtils.subtract method

    XMLWordPrintableJSON

Details

    Description

      Reproduce code:

              TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
      
              tableEnv.executeSql("create table datagen_source1 (disburse_amount int) with ('connector' = 'datagen')");
      
              tableEnv.executeSql("create table print_sink (disburse_amount Decimal(18,4)) with ('connector' = 'print')");
      
              tableEnv.executeSql("create view mid as select cast(disburse_amount as Decimal(18,4)) - cast(disburse_amount as Decimal(18,4)) as disburse_amount from datagen_source1");
      
              tableEnv.executeSql("insert into print_sink select * from mid where disburse_amount > 0 ").await();
      

      Excpetion:

      Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
      	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
      	at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
      	at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
      	at com.shopee.flink.BugExample2.main(BugExample2.java:21)
      Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
      	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
      	at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
      	at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)
      	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
      	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
      	... 6 more
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
      	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
      	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
      	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
      	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
      	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
      	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
      	at akka.dispatch.OnComplete.internal(Future.scala:300)
      	at akka.dispatch.OnComplete.internal(Future.scala:297)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
      	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
      	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
      	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
      	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
      	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
      	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
      	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
      	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
      	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
      	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
      	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
      	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
      	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
      	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
      	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
      	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
      	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:739)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:716)
      	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
      	at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
      	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
      	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
      	at akka.actor.Actor.aroundReceive(Actor.scala:537)
      	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
      	... 4 more
      Caused by: java.lang.NullPointerException
      	at org.apache.flink.table.data.DecimalDataUtils.compare(DecimalDataUtils.java:217)
      	at StreamExecCalc$17.processElement_split1(Unknown Source)
      	at StreamExecCalc$17.processElement(Unknown Source)
      	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
      	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
      	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
      	at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:120)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
      

      Root cause:

      For above sql, the generated StreamExecCalc has following code:

                isNull$299 = externalResult$298 == null;
                result$299 = null;
                if (!isNull$299) {
                  result$299 = externalResult$298;
                }
                
                isNull$300 = isNull$296 || isNull$299;
                result$301 = null;
                if (!isNull$300) {
                  
                
                result$301 = org.apache.flink.table.data.DecimalDataUtils.subtract(result$296, result$299, 19, 4);  // note the preciesion is 19
                
                  isNull$300 = (result$301 == null);
                }
                
                
                isNull$302 = isNull$300 || false;
                result$303 = false;
                if (!isNull$302) {
                  
                
                result$303 = org.apache.flink.table.data.DecimalDataUtils.compare(result$301, ((int) 0)) < 0;
                
                  
                }
      

      It seems the precision param of the DecimalDataUtils.subtract method is 19 rather than 18, but the precision of DecimalData value (result$296, result$299) is still 18. So the isCompact() method still returns true. Finally, this method will generate a problematic DecimalData:

      The returned DecimalData is not compacted (precision > MAX_LONG_DIGITS == 18). When comparing it with other int value, the decimalVal will be used, but for this value, the decimalVal is null. So the NPE thrown.

      We found it on flink 1.13 and the latest master branch. Other versions of flink have not been tested, but there should be this bug.

      Attachments

        Issue Links

          Activity

            People

              zhongwei Wei Zhong
              zhongwei Wei Zhong
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: