Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4577

Re-enable transparent reshard handling in Kinesis Consumer

    Details

    • Type: Task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      In FLINK-4341, we disabled transparent reshard handling in the Kinesis consumer as a short-term workaround before FLINK-4576 comes around.

      This ticket tracks the progress of re-enabling it again, by implementing a LowWatermarkListener interface as described in FLINK-4576.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3458

          FLINK-4577 [kinesis] Transparent reshard handling for FlinkKinesisConsumer

          This PR uses the new `SourceFunction.SourceContext#markAsTemporarilyIdle()` to let the Flink Kinesis Consumer handle reshards transparently (i.e., without the need to fail and restart the job when a Kinesis reshard operation occurs).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-4577

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3458.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3458


          commit 60f1a715c220cfb31a22b5812942dc44a9b42652
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-03-02T10:56:39Z

          FLINK-4577 [kinesis] Transparent reshard handling for FlinkKinesisConsumer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3458 FLINK-4577 [kinesis] Transparent reshard handling for FlinkKinesisConsumer This PR uses the new `SourceFunction.SourceContext#markAsTemporarilyIdle()` to let the Flink Kinesis Consumer handle reshards transparently (i.e., without the need to fail and restart the job when a Kinesis reshard operation occurs). You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4577 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3458.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3458 commit 60f1a715c220cfb31a22b5812942dc44a9b42652 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-03-02T10:56:39Z FLINK-4577 [kinesis] Transparent reshard handling for FlinkKinesisConsumer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3458

          Looks very nice and cleans up a lot of code.
          Do you know if we can check with some Kinesis users to validate that this works in practice as expected?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3458 Looks very nice and cleans up a lot of code. Do you know if we can check with some Kinesis users to validate that this works in practice as expected?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3458

          @StephanEwen yes, I think that would be nice. I'll try to reach out to some of the Kinesis Flink users I know of.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3458 @StephanEwen yes, I think that would be nice. I'll try to reach out to some of the Kinesis Flink users I know of.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3458

          FYI: Got in touch with @skidder, who is willing to try the patch Thanks a lot Scott!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3458 FYI: Got in touch with @skidder, who is willing to try the patch Thanks a lot Scott!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user skidder commented on the issue:

          https://github.com/apache/flink/pull/3458

          @tzulitai I was able to scale up the number of shards on a Kinesis stream without any interruption in processing. Looks good!

          1. Configuration
            1. Flink Cluster
              Single Job Manager running as standalone cluster, with a single Task Manager with 4 slots. Both Flink servers were built from source of this feature branch.
            1. Application
              Single Flink application running with parallelism of 4
            1. Kinesis stream
              Stream name `mux_video_events_staging` with one shard
          1. Test Steps & Results
            On startup the Flink application has Sub-task (1) read from the one shard on the stream:
            ```
            2017-03-29 15:55:03,306 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 will be seeded with initial shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455}

            ,SequenceNumberRange: {StartingSequenceNumber: 49569797317567661038287361310393874557410775187880673282,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM
            2017-03-29 15:55:03,312 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 will start consuming seeded shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000000,HashKeyRange:

            {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455}

            ,SequenceNumberRange: {StartingSequenceNumber: 49569797317567661038287361310393874557410775187880673282,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0
            ```

          Next, I increased the number of shards from 1 to 2. Sub-task (1) previously responsible for reading from the one shard is marked as temporarily idle; Sub-tasks (2) & (3) begin reading from the 2 new shards:
          ```
          2017-03-29 17:36:36,741 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 3 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange:

          {StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 340282366920938463463374607431768211455}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795731277369408532401914933651574587478731630051362,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
          2017-03-29 17:36:38,606 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 has reached the end of subscribed shard: KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000000,HashKeyRange:

          {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455}

          ,SequenceNumberRange: {StartingSequenceNumber: 49569797317567661038287361310393874557410775187880673282,}}'}
          2017-03-29 17:36:38,606 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...
          2017-03-29 17:36:45,240 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 2 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange:

          {StartingHashKey: 0,EndingHashKey: 170141183460469231731687303715884105728}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795731255068663333871291792115856314830370124070930,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
          ```

          I then increased the number of shards from 2 to 4. Sub-task (0) reads from a new shard; Sub-tasks
          (2) & (3) stop reading from their closed shards and begin reading from the new shards; Sub-task (1), which was previously marked as temporarily-idle, starts reading from a new shard:
          ```
          2017-03-29 17:41:58,005 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 2 has reached the end of subscribed shard: KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange:

          {StartingHashKey: 0,EndingHashKey: 170141183460469231731687303715884105728}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795731255068663333871291792115856314830370124070930,}}'}
          2017-03-29 17:41:58,393 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000004,ParentShardId: shardId-000000000001,HashKeyRange:

          {StartingHashKey: 85070591730234615865843651857942052865,EndingHashKey: 170141183460469231731687303715884105728}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795845144974392229763675615029074730034502679134274,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
          2017-03-29 17:41:59,602 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 0 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000003,ParentShardId: shardId-000000000001,HashKeyRange:

          {StartingHashKey: 0,EndingHashKey: 85070591730234615865843651857942052864}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795845122673647031233052473493356457386141173153842,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
          2017-03-29 17:42:28,173 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 3 has reached the end of subscribed shard: KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange:

          {StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 340282366920938463463374607431768211455}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795731277369408532401914933651574587478731630051362,}}'}
          2017-03-29 17:42:29,520 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 2 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000005,ParentShardId: shardId-000000000002,HashKeyRange:

          {StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 255211775190703847597530955573826158593}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795855871632832722993406693709563873898448640016466,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
          2017-03-29 17:42:31,183 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 3 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000006,ParentShardId: shardId-000000000002,HashKeyRange:

          {StartingHashKey: 255211775190703847597530955573826158594,EndingHashKey: 340282366920938463463374607431768211455}

          ,SequenceNumberRange: {StartingSequenceNumber: 49571795855893933577921524029835245282146546810145996898,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user skidder commented on the issue: https://github.com/apache/flink/pull/3458 @tzulitai I was able to scale up the number of shards on a Kinesis stream without any interruption in processing. Looks good! Configuration Flink Cluster Single Job Manager running as standalone cluster, with a single Task Manager with 4 slots. Both Flink servers were built from source of this feature branch. Application Single Flink application running with parallelism of 4 Kinesis stream Stream name `mux_video_events_staging` with one shard Test Steps & Results On startup the Flink application has Sub-task (1) read from the one shard on the stream: ``` 2017-03-29 15:55:03,306 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 will be seeded with initial shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455} ,SequenceNumberRange: {StartingSequenceNumber: 49569797317567661038287361310393874557410775187880673282,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM 2017-03-29 15:55:03,312 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 will start consuming seeded shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455} ,SequenceNumberRange: {StartingSequenceNumber: 49569797317567661038287361310393874557410775187880673282,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0 ``` Next, I increased the number of shards from 1 to 2. Sub-task (1) previously responsible for reading from the one shard is marked as temporarily idle; Sub-tasks (2) & (3) begin reading from the 2 new shards: ``` 2017-03-29 17:36:36,741 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 3 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 340282366920938463463374607431768211455} ,SequenceNumberRange: {StartingSequenceNumber: 49571795731277369408532401914933651574587478731630051362,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 2017-03-29 17:36:38,606 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 has reached the end of subscribed shard: KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455} ,SequenceNumberRange: {StartingSequenceNumber: 49569797317567661038287361310393874557410775187880673282,}}'} 2017-03-29 17:36:38,606 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ... 2017-03-29 17:36:45,240 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 2 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 170141183460469231731687303715884105728} ,SequenceNumberRange: {StartingSequenceNumber: 49571795731255068663333871291792115856314830370124070930,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 ``` I then increased the number of shards from 2 to 4. Sub-task (0) reads from a new shard; Sub-tasks (2) & (3) stop reading from their closed shards and begin reading from the new shards; Sub-task (1), which was previously marked as temporarily-idle, starts reading from a new shard: ``` 2017-03-29 17:41:58,005 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 2 has reached the end of subscribed shard: KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000001,ParentShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 170141183460469231731687303715884105728} ,SequenceNumberRange: {StartingSequenceNumber: 49571795731255068663333871291792115856314830370124070930,}}'} 2017-03-29 17:41:58,393 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 1 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000004,ParentShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052865,EndingHashKey: 170141183460469231731687303715884105728} ,SequenceNumberRange: {StartingSequenceNumber: 49571795845144974392229763675615029074730034502679134274,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 2017-03-29 17:41:59,602 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 0 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000003,ParentShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070591730234615865843651857942052864} ,SequenceNumberRange: {StartingSequenceNumber: 49571795845122673647031233052473493356457386141173153842,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 2017-03-29 17:42:28,173 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 3 has reached the end of subscribed shard: KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000002,ParentShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 340282366920938463463374607431768211455} ,SequenceNumberRange: {StartingSequenceNumber: 49571795731277369408532401914933651574587478731630051362,}}'} 2017-03-29 17:42:29,520 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 2 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000005,ParentShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105729,EndingHashKey: 255211775190703847597530955573826158593} ,SequenceNumberRange: {StartingSequenceNumber: 49571795855871632832722993406693709563873898448640016466,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 2017-03-29 17:42:31,183 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 3 has discovered a new shard KinesisStreamShard{streamName='mux_video_events_staging', shard='{ShardId: shardId-000000000006,ParentShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158594,EndingHashKey: 340282366920938463463374607431768211455} ,SequenceNumberRange: {StartingSequenceNumber: 49571795855893933577921524029835245282146546810145996898,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3458

          @skidder Thanks a lot for the testing and the details! This is really helpful.

          The logs and behaviour is as expected. I personally think this is good to merge then.
          @rmetzger do you have any last comments for this? If not, I'll merge it later today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3458 @skidder Thanks a lot for the testing and the details! This is really helpful. The logs and behaviour is as expected. I personally think this is good to merge then. @rmetzger do you have any last comments for this? If not, I'll merge it later today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3458

          Merging this to `master` now ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3458 Merging this to `master` now ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3458

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3458
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/1932240 .

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development