Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
1.12.0
Description
Given the following program:
//import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy } import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{$, AnyWithOperations} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction} import java.time.Instant object BugRepro { def text: String = s""" |{ | "s": "hello", | "i": ${Random.nextInt()} |} |""".stripMargin def main(args: Array[String]): Unit = { val flink = StreamExecutionEnvironment.createLocalEnvironment() val tableEnv = StreamTableEnvironment.create(flink) val dataStream = flink .addSource { new SourceFunction[(Long, String)] { var isRunning = true override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = while (isRunning) { val x = (Instant.now().toEpochMilli, text) ctx.collect(x) ctx.emitWatermark(new Watermark(x._1)) Thread.sleep(300) } override def cancel(): Unit = isRunning = false } } // .assignTimestampsAndWatermarks( // WatermarkStrategy // .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30)) // .withTimestampAssigner { // new SerializableTimestampAssigner[(Long, String)] { // override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = // element._1 // } // } // ) // tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text")) val res = tableEnv.sqlQuery(""" |SELECT json_text |FROM testview |""".stripMargin) val sink = tableEnv.executeSql( """ |CREATE TABLE SINK ( | json_text STRING |) |WITH ( | 'connector' = 'print' |) |""".stripMargin ) res.executeInsert("SINK").await() () } res.executeInsert("SINK").await()
Flink will throw a NullPointerException at runtime:
Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code:
public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references; private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0; org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2); private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public SourceConversion$3( Object[] references, org.apache.flink.streaming.runtime.tasks.StreamTask task, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output output, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { this.references = references; converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0])); this.setup(task, config, output); if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) .setProcessingTimeService(processingTimeService); } } @Override public void open() throws Exception { super.open(); } @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue()); org.apache.flink.table.data.TimestampData result$1; boolean isNull$1; org.apache.flink.table.data.binary.BinaryStringData field$2; boolean isNull$2; isNull$2 = in1.isNullAt(1); field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$2) { field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)); } ctx.element = element; result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); } isNull$1 = false; if (isNull$1) { out.setField(0, null); } else { out.setField(0, result$1); } if (isNull$2) { out.setField(1, null); } else { out.setField(1, field$2); } output.collect(outElement.replace(out)); ctx.element = null; } @Override public void close() throws Exception { super.close(); } }
The important line is here:
result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
`ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be:
if (!ctx.hasTimestamp) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); } result$1 = TimestampData.fromEpochMillis(ctx.timestamp());