Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6653

Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      Currently, the Kinesis consumer's checkpoints directly serialize AWS's Shard instances in checkpoints. This makes bumping AWS library versions hard, since any change to the Shard class by AWS will break checkpoint compatibility.

      We should either have custom serialization for KinesisStreamShard, or disintegrate the information in Shard. Ideally, it would be best to make KinesisStreamShard and SequenceNumber to be non-serializable, hence avoiding Java serialization in the checkpoints.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Thanks for the contribution Tony!

          Fixed for master via 2597e7e1803da66190ff545e705a6a4e6a6f76a2.
          Fixed for 1.3 via 7fe4df3361136755e0ca9c5647b178fdb65053f2.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks for the contribution Tony! Fixed for master via 2597e7e1803da66190ff545e705a6a4e6a6f76a2. Fixed for 1.3 via 7fe4df3361136755e0ca9c5647b178fdb65053f2.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3994

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3994
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3994

          @tony810430 thanks for the PR! I'll like to merge this for 1.3, to avoid another turn of state migration. Will merge with some minor cleanups.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3994 @tony810430 thanks for the PR! I'll like to merge this for 1.3, to avoid another turn of state migration. Will merge with some minor cleanups.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tony810430 opened a pull request:

          https://github.com/apache/flink/pull/3994

          FLINK-6653 Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

          The proposal please refer to the comment on JIRA.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [v] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [v] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tony810430/flink FLINK-6653

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3994.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3994


          commit 9ebec3a7242cd225626a9e17bad4e315f9cbab53
          Author: Tony Wei <tony19920430@gmail.com>
          Date: 2017-05-25T02:39:22Z

          FLINK-6653 Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/3994 FLINK-6653 Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints The proposal please refer to the comment on JIRA. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [v] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [v] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-6653 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3994.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3994 commit 9ebec3a7242cd225626a9e17bad4e315f9cbab53 Author: Tony Wei <tony19920430@gmail.com> Date: 2017-05-25T02:39:22Z FLINK-6653 Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
          Hide
          tonywei Wei-Che Wei added a comment - - edited

          Hi Tzu-Li (Gordon) Tai,

          I would like to take over this issue and here is my proposal.

          Description:

          1. Consumer will read two possible states: KinesisStreamShardV2 or KinesisStreamShard and merge to KinesisStreamShardV2
          2. Convert KinesisStreamShardV2 to KinesisStreamShardHandle for the internal class to interact with AWS library.
          3. Convert KinesisStreamShardHandle to KinesisStreamShardV2 and write the new states.

          Proposed Changes:

          1. Introduces two models :
            1. KinesisStreamShardV2: It stores the stream name and all the information in Shard to decouple with AWS library, and will be the new state.
            2. KinesisStreamShardHandle: It is same as KinesisStreamShard but will not be able to be serialized. It is used in KinesisFetcher
              and ShardConsumer to distinguish from the legacy KinesisStreamShard state so that it can be changed along with any update in AWS library.
          2. Add KinesisStreamShardHandle in KinesisStreamShardState.
          3. Make KinesisStreamShardV2 and SequenceNumber be POJO type.
          4. Two util functions to convert between KinesisStreamShardV2 and KinesisStreamShardHandle
          5. An util function to convert KinesisStreamShard to KinesisStreamShardV2

          Test Plan:

          1. Update all tests with the whole new models and modified models.
          2. Migrate test that makes sure states will be restored if there is only legacy state in state backend.
          3. Unit test for those util functions.
          4. Test the new state will be serialized by POJO serializer.
          Show
          tonywei Wei-Che Wei added a comment - - edited Hi Tzu-Li (Gordon) Tai , I would like to take over this issue and here is my proposal. Description: Consumer will read two possible states: KinesisStreamShardV2 or KinesisStreamShard and merge to KinesisStreamShardV2 Convert KinesisStreamShardV2 to KinesisStreamShardHandle for the internal class to interact with AWS library. Convert KinesisStreamShardHandle to KinesisStreamShardV2 and write the new states. Proposed Changes: Introduces two models : KinesisStreamShardV2 : It stores the stream name and all the information in Shard to decouple with AWS library, and will be the new state. KinesisStreamShardHandle : It is same as KinesisStreamShard but will not be able to be serialized. It is used in KinesisFetcher and ShardConsumer to distinguish from the legacy KinesisStreamShard state so that it can be changed along with any update in AWS library. Add KinesisStreamShardHandle in KinesisStreamShardState . Make KinesisStreamShardV2 and SequenceNumber be POJO type. Two util functions to convert between KinesisStreamShardV2 and KinesisStreamShardHandle An util function to convert KinesisStreamShard to KinesisStreamShardV2 Test Plan: Update all tests with the whole new models and modified models. Migrate test that makes sure states will be restored if there is only legacy state in state backend. Unit test for those util functions. Test the new state will be serialized by POJO serializer.

            People

            • Assignee:
              tonywei Wei-Che Wei
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development