Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20961

Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

       

      Given the following program:

      //import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy }
      import org.apache.flink.streaming.api.functions.source.SourceFunction
      import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
      import org.apache.flink.streaming.api.watermark.Watermark
      import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
      import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
      import org.apache.flink.table.api.{$, AnyWithOperations}
      import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction}
      import java.time.Instant
      
      object BugRepro {
        def text: String =
          s"""
             |{
             |  "s": "hello",
             |  "i": ${Random.nextInt()}
             |}
             |""".stripMargin  
        def main(args: Array[String]): Unit = {
          val flink =
            StreamExecutionEnvironment.createLocalEnvironment()
          val tableEnv = StreamTableEnvironment.create(flink)
          val dataStream = flink
            .addSource {
              new SourceFunction[(Long, String)] {
                var isRunning = true          
                override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit =
                  while (isRunning) {
                    val x = (Instant.now().toEpochMilli, text)
                    ctx.collect(x)
                    ctx.emitWatermark(new Watermark(x._1))
                    Thread.sleep(300)
                  }          
                  override def cancel(): Unit =
                    isRunning = false
              }
            }
      //      .assignTimestampsAndWatermarks(
      //        WatermarkStrategy
      //          .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30))
      //          .withTimestampAssigner {
      //            new SerializableTimestampAssigner[(Long, String)] {
      //              override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long =
      //                element._1
      //            }
      //          }
      //      )
      //
          tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text"))
          val res = tableEnv.sqlQuery("""
                                        |SELECT json_text
                                        |FROM testview
                                        |""".stripMargin)    
          val sink = tableEnv.executeSql(
            """
              |CREATE TABLE SINK (
              |  json_text STRING
              |)
              |WITH (
              |  'connector' = 'print'
              |)
              |""".stripMargin
          )    res.executeInsert("SINK").await()
          ()
        }
          res.executeInsert("SINK").await()
      
      

       

      Flink will throw a NullPointerException at runtime:

      Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
      

      This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code:

      public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
                implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {        private final Object[] references;
              private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0;
              org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2);
              private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);        public SourceConversion$3(
                  Object[] references,
                  org.apache.flink.streaming.runtime.tasks.StreamTask task,
                  org.apache.flink.streaming.api.graph.StreamConfig config,
                  org.apache.flink.streaming.api.operators.Output output,
                  org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
                this.references = references;
                converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0]));
                this.setup(task, config, output);
                if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
                  ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
                    .setProcessingTimeService(processingTimeService);
                }
              }        @Override
              public void open() throws Exception {
                super.open();
                
              }        @Override
              public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
                org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue());
                
                org.apache.flink.table.data.TimestampData result$1;
                boolean isNull$1;
                org.apache.flink.table.data.binary.BinaryStringData field$2;
                boolean isNull$2;
                isNull$2 = in1.isNullAt(1);
                field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
                if (!isNull$2) {
                  field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
                }
                
                ctx.element = element;
                
                
                
                result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp());
                if (result$1 == null) {
                  throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " +
                    "proper TimestampAssigner is defined and the stream environment uses the EventTime " +
                    "time characteristic.");
                }
                isNull$1 = false;
                if (isNull$1) {
                  out.setField(0, null);
                } else {
                  out.setField(0, result$1);
                }
                          
                
                
                if (isNull$2) {
                  out.setField(1, null);
                } else {
                  out.setField(1, field$2);
                }
                          
                        
                output.collect(outElement.replace(out));
                ctx.element = null;
                
              }                @Override
              public void close() throws Exception {
                 super.close();
                
              }        
            }
      

      The important line is here:

      result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); 
      
      if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
      

      `ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be:

      if (!ctx.hasTimestamp) {
        throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
      }
      
      result$1 = TimestampData.fromEpochMillis(ctx.timestamp());

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Yuval.Itzchakov Yuval Itzchakov
            Yuval.Itzchakov Yuval Itzchakov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment