Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6311

NPE in FlinkKinesisConsumer if source was closed before run

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.2.2
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      This was reported by an user: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html

      The shutdownFetcher method of KinesisDataFetcher is not protected against the condition when the source was closed before it started running. Both mainThread and shardConsumersExecutor should have null checks.

        Issue Links

          Activity

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/a0249d9 . Resolved for release-1.2 via http://git-wip-us.apache.org/repos/asf/flink/commit/80dc704 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3738

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3738
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/3738

          @tzulitai You are very welcome . It is my pleasure ~

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @tzulitai You are very welcome . It is my pleasure ~
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3738

          Thanks @zhangminglei. LGTM!

          Merging to master and release-1.2 (will merge a bit later today ) ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3738 Thanks @zhangminglei. LGTM! Merging to master and release-1.2 (will merge a bit later today ) ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/3738

          @tzulitai Hi, I have updated the code. Please check it out. Thanks and appreciate it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @tzulitai Hi, I have updated the code. Please check it out. Thanks and appreciate it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/3738

          @StephanEwen @tzulitai Thanks for telling me so useful information. I will fix it soon enough. Very appreciate it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @StephanEwen @tzulitai Thanks for telling me so useful information. I will fix it soon enough. Very appreciate it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3738#discussion_r112302446

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java —
          @@ -408,12 +408,15 @@ public void runFetcher() throws Exception {
          */
          public void shutdownFetcher() {
          running = false;

          • mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
            -
            + if (mainThread != null) { + mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + }

            if (LOG.isInfoEnabled()) {
            LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
            }

          • shardConsumersExecutor.shutdownNow();
            + if (shardConsumersExecutor != null) {
              • End diff –

          Actually I think this is a redundant `null` check, because `shardConsumersExecutor` is final.
          It should never be null, so this null check might actually be confusing to other readers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3738#discussion_r112302446 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java — @@ -408,12 +408,15 @@ public void runFetcher() throws Exception { */ public void shutdownFetcher() { running = false; mainThread.interrupt(); // the main thread may be sleeping for the discovery interval - + if (mainThread != null) { + mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + } if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } shardConsumersExecutor.shutdownNow(); + if (shardConsumersExecutor != null) { End diff – Actually I think this is a redundant `null` check, because `shardConsumersExecutor` is final. It should never be null, so this null check might actually be confusing to other readers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3738

          LGTM after Stephan's comment on making `mainThread` variable `volatile` is addressed.
          Could you do that @zhangminglei?
          Once updated I'll proceed to merge this, thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3738 LGTM after Stephan's comment on making `mainThread` variable `volatile` is addressed. Could you do that @zhangminglei? Once updated I'll proceed to merge this, thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3738

          The `shardConsumersExecutor` variable is final. No need to make it `volatile`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3738 The `shardConsumersExecutor` variable is final. No need to make it `volatile`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/3738

          @StephanEwen Shouldn't we also make ```shardConsumersExecutor``` variable ```volatile``` as well ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @StephanEwen Shouldn't we also make ```shardConsumersExecutor``` variable ```volatile``` as well ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3738

          Looks good. To make this really safe, we should actually make the `mainThread` variable `volatile`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3738 Looks good. To make this really safe, we should actually make the `mainThread` variable `volatile`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zhangminglei opened a pull request:

          https://github.com/apache/flink/pull/3738

          FLINK-6311 [Kinesis Connector] NPE in FlinkKinesisConsumer if sourc…

          …e was closed before run.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zhangminglei/flink flink-6311

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3738.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3738


          commit ff6dcea5fc3a1f8ebb8aca73d61736818b9fa7f3
          Author: zhangminglei <zml13856086071@163.com>
          Date: 2017-04-19T09:43:57Z

          FLINK-6311 [Kinesis Connector] NPE in FlinkKinesisConsumer if source was closed before run.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/3738 FLINK-6311 [Kinesis Connector] NPE in FlinkKinesisConsumer if sourc… …e was closed before run. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6311 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3738.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3738 commit ff6dcea5fc3a1f8ebb8aca73d61736818b9fa7f3 Author: zhangminglei <zml13856086071@163.com> Date: 2017-04-19T09:43:57Z FLINK-6311 [Kinesis Connector] NPE in FlinkKinesisConsumer if source was closed before run.
          Hide
          mingleizhang mingleizhang added a comment -

          Tzu-Li (Gordon) Tai Thanks and I will give a PR to this jira soon enough. Very appreciate it.

          Show
          mingleizhang mingleizhang added a comment - Tzu-Li (Gordon) Tai Thanks and I will give a PR to this jira soon enough. Very appreciate it.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I think we can just do:

          if (mainThread != null) {
              mainThread.interrupt();
          }
          

          and likewise for the executor.

          The reason is that there can be cases that the consumer is closed before these fields reference actual values (they are assigned after run() starts, but there is no guarantee when close() is called on the consumer). So, we should just do null checks to safe guard against such conditions.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I think we can just do: if (mainThread != null ) { mainThread.interrupt(); } and likewise for the executor. The reason is that there can be cases that the consumer is closed before these fields reference actual values (they are assigned after run() starts, but there is no guarantee when close() is called on the consumer). So, we should just do null checks to safe guard against such conditions.
          Hide
          mingleizhang mingleizhang added a comment - - edited

          Tzu-Li (Gordon) Tai Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does necessary, what kinda messages should we put it here? Thanks ~

          	public void shutdownFetcher() {
          		running = false;
          		checkNotNull(mainThread, "mainThread is null.");
          		mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
          
          		if (LOG.isInfoEnabled()) {
          			LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
          		}
          		checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null.");
          		shardConsumersExecutor.shutdownNow();
          	}
          
          Show
          mingleizhang mingleizhang added a comment - - edited Tzu-Li (Gordon) Tai Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does necessary, what kinda messages should we put it here? Thanks ~ public void shutdownFetcher() { running = false ; checkNotNull(mainThread, "mainThread is null ." ); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info( "Shutting down the shard consumer threads of subtask {} ..." , indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null ." ); shardConsumersExecutor.shutdownNow(); }
          Hide
          mingleizhang mingleizhang added a comment -

          Tzu-Li (Gordon) Tai Thanks for telling me so useful information. I am very appreciate it. Yep, I would like to work on this and been working on it soon enough.

          Show
          mingleizhang mingleizhang added a comment - Tzu-Li (Gordon) Tai Thanks for telling me so useful information. I am very appreciate it. Yep, I would like to work on this and been working on it soon enough.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          the flink-connector-kinesis module is only included as a separate build profile ("include-kinesis"). We did this because of AWS License issues, and do not build the kinesis connector along with the other code. But this shouldn't be relevant for this JIRA, you can still proceed to fix it if you want to

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - the flink-connector-kinesis module is only included as a separate build profile ("include-kinesis"). We did this because of AWS License issues, and do not build the kinesis connector along with the other code. But this shouldn't be relevant for this JIRA, you can still proceed to fix it if you want to
          Hide
          mingleizhang mingleizhang added a comment - - edited

          Tzu-Li (Gordon) Tai I just watch FlinkKinesisConsumer which under the package of

          org.apache.flink.streaming.connectors.kinesis

          in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml does not contains the

          flink-connector-kinesis

          . I would think we should add the module

           flink-connector-kinesis 

          in flink-connectors pom.xml and then return to this issue. How do you think of this ?

          Show
          mingleizhang mingleizhang added a comment - - edited Tzu-Li (Gordon) Tai I just watch FlinkKinesisConsumer which under the package of org.apache.flink.streaming.connectors.kinesis in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml does not contains the flink-connector-kinesis . I would think we should add the module flink-connector-kinesis in flink-connectors pom.xml and then return to this issue. How do you think of this ?

            People

            • Assignee:
              mingleizhang mingleizhang
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development