Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30224

Add IT Case for NPE in FlinkKinesisConsumer's close() method

    XMLWordPrintableJSON

Details

    Description

      In our project we observed an issue during too early call of stop-with-savepoint operation when we're calling it after 4-6 seconds after a job was started.

      We use Kinesis EFO consumer in our application and it looks like that it takes some time for the KinesisDataFetcher to be initialized and because of that FlinkKinesisConsumer#run(SourceContext<T>) method is slow. In some tests we call stop-with-savepoint after 4-6 seconds after a job was started and we have NullPointerException in FlinkKinesisConsumer#close():

      java.lang.NullPointerException: null
              at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) ~[classes/:?]
              at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[classes/:?]
              at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[classes/:?]
              at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[classes/:?]
              at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[classes/:?]
              at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[classes/:?]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997) ~[classes/:?]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:876) ~[classes/:?]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:759) ~[classes/:?]
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[classes/:?]
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[classes/:?]
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[classes/:?]
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) ~[classes/:?]
              at java.lang.Thread.run(Thread.java:829) ~[?:?] 

      We were able to reproduce this issue in FlinkKinesisITCase even for a standard KDS consumer. A fix is just an additional non-null check during consumer's close. We'll also add an IT-test for that.

      Attachments

        Issue Links

          Activity

            People

              astamur Astamur Kirillin
              astamur Astamur Kirillin
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: