Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30018

NPE error in generated StreamExecCalc

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.16.0
    • None
    • Table SQL / Runtime
    • None

    Description

      Hi, I met a NPE exception running Flink SQL. The exception is

      rg.apache.flink.runtime.taskmanager.Task                    [] - Join[292] -> Calc[293] -> ConstraintEnforcer[294] (10/48)#0 (e628391c0b38d4d22ae62a181a2d7f22_c9cd1581189658451a8850505c8a0007_9_0) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
          at StreamExecCalc$20690.processElement_split881(Unknown Source)
          at StreamExecCalc$20690.processElement(Unknown Source)
          at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
          at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
          at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
          at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
          at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
          at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
          at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:334)
          at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:219)
          at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:124)
          at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
          at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
          at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
          at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
          at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
          at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
          at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
          at java.lang.Thread.run(Thread.java:750) 

      `StreamExecCalc$20690.processElement_split881()` function is below:

      private final org.apache.flink.table.data.binary.BinaryStringData str$19838 = org.apache.flink.table.data.binary.BinaryStringData.fromString("N");
      
      void processElement_split881(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
      if (isNull$19828) {
                  out.setNullAt(4);
                } else {
                  out.setNonPrimitiveValue(4, field$19829);
                }
      writer$19903.reset();
      writer$19903.writeBoolean(0, ((boolean) false));
      if (isNull$19830) {
                  writer$19903.setNullAt(1);
                } else {
                  writer$19903.writeLong(1, field$19830);
                }
      isNull$19831 = isNull$19821 || false;
      result$19832 = false;
      if (!isNull$19831) {
                  
                
                result$19832 = field$19821 == ((long) 44571L);
                
                  
                }
      result$19834 = -1L;
      if (result$19832) {
                  
                  if (!isNull$19833) {
                    result$19834 = field$19833;
                  }
                  isNull$19834 = isNull$19833;
                } else {
                  
                  if (!false) {
                    result$19834 = ((long) 0L);
                  }
                  isNull$19834 = false;
                }
      if (isNull$19834) {
                  writer$19903.setNullAt(2);
                } else {
                  writer$19903.writeLong(2, result$19834);
                }
      isNull$19851 = false;
      if (!isNull$19851) {
                if (((org.apache.flink.table.data.binary.BinaryStringData) str$19838).numChars() > 1) {
                result$19852 = ((org.apache.flink.table.data.binary.BinaryStringData) str$19838).substring(0, 1);
                } else {
                if (((org.apache.flink.table.data.binary.BinaryStringData) str$19838).numChars() < 1) {
                
                padLength$19853 = 1 - ((org.apache.flink.table.data.binary.BinaryStringData) str$19838).numChars();
                
                padString$19854 = org.apache.flink.table.data.binary.BinaryStringData.blankString(padLength$19853);
                result$19852 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(((org.apache.flink.table.data.binary.BinaryStringData) str$19838), padString$19854);
                } else {
                result$19852 = ((org.apache.flink.table.data.binary.BinaryStringData) str$19838);
                }
                }
                isNull$19851 = result$19852 == null;
                } else {
                result$19852 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
                }
      isNull$19855 = isNull$19850;
      if (!isNull$19855) {
                if (result$19850.numChars() > 1) {
                result$19856 = result$19850.substring(0, 1);
                } else {
                if (result$19850.numChars() < 1) {
                
                padLength$19857 = 1 - result$19850.numChars();
                
                padString$19858 = org.apache.flink.table.data.binary.BinaryStringData.blankString(padLength$19857);
                result$19856 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(result$19850, padString$19858);
                } else {
                result$19856 = result$19850;
                }
                }
                isNull$19855 = result$19856 == null;
                } else {
                result$19856 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
                }
      result$19859 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
      } 

       

      Could you take a look at this issue? I could not find why NPE is thrown.

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jackwangcs jackwangcs
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: