Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Done
-
None
Description
In our project we observed an issue during too early call of stop-with-savepoint operation when we're calling it after 4-6 seconds after a job was started.
We use Kinesis EFO consumer in our application and it looks like that it takes some time for the KinesisDataFetcher to be initialized and because of that FlinkKinesisConsumer#run(SourceContext<T>) method is slow. In some tests we call stop-with-savepoint after 4-6 seconds after a job was started and we have NullPointerException in FlinkKinesisConsumer#close():
java.lang.NullPointerException: null at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) ~[classes/:?] at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[classes/:?] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[classes/:?] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:876) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:759) ~[classes/:?] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[classes/:?] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[classes/:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[classes/:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) ~[classes/:?] at java.lang.Thread.run(Thread.java:829) ~[?:?]
We were able to reproduce this issue in FlinkKinesisITCase even for a standard KDS consumer. A fix is just an additional non-null check during consumer's close. We'll also add an IT-test for that.
Attachments
Issue Links
- relates to
-
FLINK-29324 Calling Kinesis connector close method before subtask starts running results in NPE
- Resolved
- links to