Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4821

Implement rescalable non-partitioned state for Kinesis Connector

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      FLINK-4379 added the rescalable non-partitioned state feature, along with the implementation for the Kafka connector.
      The AWS Kinesis connector will benefit from the feature and should implement it too. This ticket tracks progress for this.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tony810430 opened a pull request:

          https://github.com/apache/flink/pull/3001

          FLINK-4821 [kinesis] Implement rescalable non-partitioned state for Kinesis Connector

          Implement ListCheckpointed interface on Kinesis Consumer

          As a reminder, I returned empty list instead of null in some cases in the snapshotState, because snapshotState method in AbstractUdfStreamOperator didn't handle the situation when list is null and would get NullPointeException in the foreach statement. (see line 106 to line 119 in AbstractUdfStreamOperator)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tony810430/flink FLINK-4821

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3001.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3001


          commit fa0829738fd075fb4e65e31698acbc8fd0f16af2
          Author: 魏偉哲 <tonywei@tonyweis-macbook-pro.local>
          Date: 2016-12-14T02:18:25Z

          FLINK-4821 Implement rescalable non-partitioned state for Kinesis Connector


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/3001 FLINK-4821 [kinesis] Implement rescalable non-partitioned state for Kinesis Connector Implement ListCheckpointed interface on Kinesis Consumer As a reminder, I returned empty list instead of null in some cases in the snapshotState, because snapshotState method in AbstractUdfStreamOperator didn't handle the situation when list is null and would get NullPointeException in the foreach statement. (see line 106 to line 119 in AbstractUdfStreamOperator) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-4821 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3001.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3001 commit fa0829738fd075fb4e65e31698acbc8fd0f16af2 Author: 魏偉哲 <tonywei@tonyweis-macbook-pro.local> Date: 2016-12-14T02:18:25Z FLINK-4821 Implement rescalable non-partitioned state for Kinesis Connector
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r92366382

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -377,19 +378,19 @@ public void testUnparsableLongForAggregationMaxCountInConfig() {
          // ----------------------------------------------------------------------

          @Test

          • public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception {
            + public void testSnapshotStateShouldBeEmptyListIfSourceNotOpened() throws Exception {
            Properties config = new Properties();
            config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
            config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
            config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

          FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);

          • assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp
            + assertTrue(consumer.snapshotState(123, 123).size() == 0); //arbitrary checkpoint id and timestamp
              • End diff –

          For actual comparisons, it is nice to use `assertEquals`, because it shows you the actual value is the error message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92366382 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -377,19 +378,19 @@ public void testUnparsableLongForAggregationMaxCountInConfig() { // ---------------------------------------------------------------------- @Test public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception { + public void testSnapshotStateShouldBeEmptyListIfSourceNotOpened() throws Exception { Properties config = new Properties(); config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp + assertTrue(consumer.snapshotState(123, 123).size() == 0); //arbitrary checkpoint id and timestamp End diff – For actual comparisons, it is nice to use `assertEquals`, because it shows you the actual value is the error message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r92366454

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,20 +269,20 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public List<Tuple2<KinesisStreamShard, SequenceNumber>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            if (lastStateSnapshot == null) {
            LOG.debug("snapshotState() requested on not yet opened source; returning null.");
          • return null;
            + return new ArrayList<>();
              • End diff –

          Its good style to avoid creating empty lists. Use `Collections.emptyList()` instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92366454 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,20 +269,20 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public List<Tuple2<KinesisStreamShard, SequenceNumber>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); return null; + return new ArrayList<>(); End diff – Its good style to avoid creating empty lists. Use `Collections.emptyList()` instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r92366577

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -294,11 +296,18 @@ public void close() throws Exception

          { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); }
          • return lastStateSnapshot;
            + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>();
              • End diff –

          Always initialize array lists with the expected size.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92366577 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -294,11 +296,18 @@ public void close() throws Exception { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } return lastStateSnapshot; + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(); End diff – Always initialize array lists with the expected size.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r92554828

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -294,11 +296,18 @@ public void close() throws Exception

          { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); }
          • return lastStateSnapshot;
            + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size());
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) {
              • End diff –

          formatting nit: need empty space before colon `:`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92554828 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -294,11 +296,18 @@ public void close() throws Exception { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } return lastStateSnapshot; + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size()); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) { End diff – formatting nit: need empty space before colon `:`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r92554905

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -294,11 +296,18 @@ public void close() throws Exception

          { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); }
          • return lastStateSnapshot;
            + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size());
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) { + listState.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            + return listState;
            }

          @Override

          • public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
          • sequenceNumsToRestore = restoredState;
            + public void restoreState(List<Tuple2<KinesisStreamShard, SequenceNumber>> state) throws Exception {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> subState: state) {
              • End diff –

          formatting nit: need empty space before colon ":"

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92554905 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -294,11 +296,18 @@ public void close() throws Exception { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } return lastStateSnapshot; + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size()); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) { + listState.add(Tuple2.of(entry.getKey(), entry.getValue())); + } + return listState; } @Override public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { sequenceNumsToRestore = restoredState; + public void restoreState(List<Tuple2<KinesisStreamShard, SequenceNumber>> state) throws Exception { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> subState: state) { End diff – formatting nit: need empty space before colon ":"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r92555340

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -294,11 +296,18 @@ public void close() throws Exception

          { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); }
          • return lastStateSnapshot;
            + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size());
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) { + listState.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            + return listState;
            }

          @Override

          • public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
          • sequenceNumsToRestore = restoredState;
            + public void restoreState(List<Tuple2<KinesisStreamShard, SequenceNumber>> state) throws Exception {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> subState: state) {
              • End diff –

          We should probably do a null check here for `state`.
          From the looks of #3005, I don't think restored state will ever be null (will be empty list), but it'd be good to make the code here independent of that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92555340 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -294,11 +296,18 @@ public void close() throws Exception { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } return lastStateSnapshot; + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size()); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) { + listState.add(Tuple2.of(entry.getKey(), entry.getValue())); + } + return listState; } @Override public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { sequenceNumsToRestore = restoredState; + public void restoreState(List<Tuple2<KinesisStreamShard, SequenceNumber>> state) throws Exception { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> subState: state) { End diff – We should probably do a null check here for `state`. From the looks of #3005, I don't think restored state will ever be null (will be empty list), but it'd be good to make the code here independent of that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3001

          @tzulitai as you wish =)

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 @tzulitai as you wish =)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3001

          @tony810430 (cc @StephanEwen, f.y.i.)
          At a second closer look, I'm afraid this PR can't be merged as is. The problem is that the state redistribution of `ListCheckpointed` doesn't work with the Kinesis consumer's current shard discovery mechanism.

          On restore, each subtask uses the restored states it gets to appropriately set the "last seen shard ID" of the subtask. With this value set, the subtask is able to discover only shards after the "last seen shard ID". Then, the subtask determines which of the newly discovered shards it should be responsible of consuming, using a simple modulo operation on the shards' hash values.

          This works before when restored state could not be redistributed, because subtasks will always be restored shards which belong to that subtask (i.e. via the modulo on hash operation).

          The state redistribution on restore for `ListCheckpointed` breaks this. For example:
          Job starts with only 1 subtask for FlinkKinesisConsumer, and the Kinesis stream has 2 shards:
          subtask #1 --> shard1, shard2.

          After a restore with increased parallelism to 2, let's say the list state gets redistributed as:
          subtask #1 --> shard1
          subtask #2 --> shard2

          Subtask #1's last seen shard ID will be set to shard1, and will therefore discover shard2 as a new shard afterwards. If the shard2 gets hashed to subtask #1, we'll have both subtasks consuming shard2.

          Changing the hashing / subtask-to-shard assignment determination for the shard discovery probably can't solve the problem, because no matter how we change that, it'll still be dependent of what the list state redistribution looks like.

          The only way I can see in solving this would probably be have merged state on restore, so that all subtasks may set the "last seen shard ID" to the largest ID across all subtasks, not just the local subtask.

          In flip-8 I see the community has discussed an interface for merged state also (a unioned list state on restore). I think that will be really useful in this particular case here. It'll also be relevant for the Kafka connector, right now it seems irrelevant only because the Kafka consumer doesn't have partition discovery yet.
          @StefanRRichter could you probably provide some insight on the merged state aspect? I'm not that familiar yet with the recent works and progress on the repartitionable states.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 @tony810430 (cc @StephanEwen, f.y.i.) At a second closer look, I'm afraid this PR can't be merged as is. The problem is that the state redistribution of `ListCheckpointed` doesn't work with the Kinesis consumer's current shard discovery mechanism. On restore, each subtask uses the restored states it gets to appropriately set the "last seen shard ID" of the subtask. With this value set, the subtask is able to discover only shards after the "last seen shard ID". Then, the subtask determines which of the newly discovered shards it should be responsible of consuming, using a simple modulo operation on the shards' hash values. This works before when restored state could not be redistributed, because subtasks will always be restored shards which belong to that subtask (i.e. via the modulo on hash operation). The state redistribution on restore for `ListCheckpointed` breaks this. For example: Job starts with only 1 subtask for FlinkKinesisConsumer, and the Kinesis stream has 2 shards: subtask #1 --> shard1, shard2. After a restore with increased parallelism to 2, let's say the list state gets redistributed as: subtask #1 --> shard1 subtask #2 --> shard2 Subtask #1's last seen shard ID will be set to shard1, and will therefore discover shard2 as a new shard afterwards. If the shard2 gets hashed to subtask #1, we'll have both subtasks consuming shard2. Changing the hashing / subtask-to-shard assignment determination for the shard discovery probably can't solve the problem, because no matter how we change that, it'll still be dependent of what the list state redistribution looks like. The only way I can see in solving this would probably be have merged state on restore, so that all subtasks may set the "last seen shard ID" to the largest ID across all subtasks, not just the local subtask. In flip-8 I see the community has discussed an interface for merged state also (a unioned list state on restore). I think that will be really useful in this particular case here. It'll also be relevant for the Kafka connector, right now it seems irrelevant only because the Kafka consumer doesn't have partition discovery yet. @StefanRRichter could you probably provide some insight on the merged state aspect? I'm not that familiar yet with the recent works and progress on the repartitionable states.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3001

          I have an open PR #2948 that would introduce all the facilities for global and union state. It is just a matter of also exposing it two the user. I think at least for the union state, this can trivially be done if we have this in.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3001 I have an open PR #2948 that would introduce all the facilities for global and union state. It is just a matter of also exposing it two the user. I think at least for the union state, this can trivially be done if we have this in.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3001

          Thanks for the info Stefan! @tony810430 we'll probably need to block this PR for now, and refresh it once the unioned state interface comes around.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 Thanks for the info Stefan! @tony810430 we'll probably need to block this PR for now, and refresh it once the unioned state interface comes around.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3001

          OK. Thanks for pointing out this problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 OK. Thanks for pointing out this problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3001

          FYI, I updated my PR so that broadcast state is now exposed. However, at least for now you would need to use the more powerful `CheckpointedFunction` (usage examples are currently just in the tests) over the simpler `ListCheckpointed`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3001 FYI, I updated my PR so that broadcast state is now exposed. However, at least for now you would need to use the more powerful `CheckpointedFunction` (usage examples are currently just in the tests) over the simpler `ListCheckpointed`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3001

          Hi @tony810430!

          Sorry for the long pause on this PR. After some back and forth offline discussions with others on how exactly we want to proceed with this, we decided to stick with using union state to cope with the shard discovery on restore problem (at least for 1.3.0). Therefore, we can finally continue work here :-D

          First of all, to use union state, instead of `ListCheckpointed`, we should use `CheckpointedFunction` instead. There is a PR for exposing union state to the public API (#3508), but in case that isn't merged yet within the next few days, I suggest that you don't need to be blocked when you continue your work on this PR. For now, you can cast the operator state store instance retrieved through the `FunctionInitializationContext` to `DefaultOperatorStateStore` to use broadcast state.

          One thing to also note, which is missing in you previous work on this, is that we need a migration path from the old state access (i.e., via `CheckpointedAsynchronously`) to the new state (i.e. `CheckpointedFunction`).

          The `FlinkKafkaConsumerBase` class in the Kafka connector provides a very good example of how to do this. Simply put, in the end, the `FlinkKinesisConsumer` should implement both `CheckpointedRestoring` and `CheckpointedFunction`, and bridge the old state read from the legacy `restoreState(...)` method to the new `initializeState(...)` method. The bridge would simply be a field variable in the consumer class.

          The `FlinkKafkaConsumerBase` also serves as a good example of how to use the `CheckpointedFunction` if you have questions there.

          Let me know if you have any questions with this, and feel free to ping me any time!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tony810430! Sorry for the long pause on this PR. After some back and forth offline discussions with others on how exactly we want to proceed with this, we decided to stick with using union state to cope with the shard discovery on restore problem (at least for 1.3.0). Therefore, we can finally continue work here :-D First of all, to use union state, instead of `ListCheckpointed`, we should use `CheckpointedFunction` instead. There is a PR for exposing union state to the public API (#3508), but in case that isn't merged yet within the next few days, I suggest that you don't need to be blocked when you continue your work on this PR. For now, you can cast the operator state store instance retrieved through the `FunctionInitializationContext` to `DefaultOperatorStateStore` to use broadcast state. One thing to also note, which is missing in you previous work on this, is that we need a migration path from the old state access (i.e., via `CheckpointedAsynchronously`) to the new state (i.e. `CheckpointedFunction`). The `FlinkKafkaConsumerBase` class in the Kafka connector provides a very good example of how to do this. Simply put, in the end, the `FlinkKinesisConsumer` should implement both `CheckpointedRestoring` and `CheckpointedFunction`, and bridge the old state read from the legacy `restoreState(...)` method to the new `initializeState(...)` method. The bridge would simply be a field variable in the consumer class. The `FlinkKafkaConsumerBase` also serves as a good example of how to use the `CheckpointedFunction` if you have questions there. Let me know if you have any questions with this, and feel free to ping me any time!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3001

          Hi @tzulitai

          Ok, I see. Thanks for your reminder. =)

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tzulitai Ok, I see. Thanks for your reminder. =)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3001

          Hi @tzulitai,

          I have finished implementing union state with `CheckpointedFunction` and `CheckpointedRestoring`.
          I also added some tests for handling the union state and migration path from legacy state as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tzulitai, I have finished implementing union state with `CheckpointedFunction` and `CheckpointedRestoring`. I also added some tests for handling the union state and migration path from legacy state as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3001

          Thanks a lot for your work on this. I'll try to review this early next week

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 Thanks a lot for your work on this. I'll try to review this early next week
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3001

          Hi @tony810430,
          The exposure for union list state was just merged to master.
          Could you rebase this? Once rebased I'll do a full review so that we can finally work towards merging this

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tony810430, The exposure for union list state was just merged to master. Could you rebase this? Once rebased I'll do a full review so that we can finally work towards merging this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3001

          Hi @tzulitai ,
          I have rebased it and updated to the public API.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tzulitai , I have rebased it and updated to the public API.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112122128

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +293,84 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + }

            else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }

            +
            + offsetsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();
            +
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
            +
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet())

            { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotting state ...");
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple));
            +
            + if (context.isRestored()) {
            + if (sequenceNumsToRestore == null) {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + }

            +
            + LOG.info("Setting restore state in the FlinkKinesisConsumer.");
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Using the following offsets: {}", sequenceNumsToRestore);
            + }
            + } else if (sequenceNumsToRestore.isEmpty())

            { + sequenceNumsToRestore = null; + }

            + } else

            { + LOG.info("No restore state for FlinkKinesisConsumer."); }

            + }

          • lastStateSnapshot = fetcher.snapshotState();
            + @Override
            + public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
            + LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
              • End diff –

          I think the other log messages follow a different format then this.
          The others do something like `Subtask {} is restoring offset from an older version`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112122128 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +293,84 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } + + offsetsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state ..."); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + } + + LOG.info("Setting restore state in the FlinkKinesisConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", sequenceNumsToRestore); + } + } else if (sequenceNumsToRestore.isEmpty()) { + sequenceNumsToRestore = null; + } + } else { + LOG.info("No restore state for FlinkKinesisConsumer."); } + } lastStateSnapshot = fetcher.snapshotState(); + @Override + public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { + LOG.info("{} (taskIdx={}) restoring offsets from an older version.", End diff – I think the other log messages follow a different format then this. The others do something like `Subtask {} is restoring offset from an older version`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112123595

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {
          — End diff –

          @tony810430 could you briefly explain the changes here in this "if (isRestoringFromFailure)" block? I don't see how its relevant for the rescaling change. I may be missing something, though, so would be great if you explain a bit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112123595 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { — End diff – @tony810430 could you briefly explain the changes here in this "if (isRestoringFromFailure)" block? I don't see how its relevant for the rescaling change. I may be missing something, though, so would be great if you explain a bit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112122515

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +293,84 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + }

            else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }

            +
            + offsetsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();
            +
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
            +
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet())

            { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotting state ...");
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple));
            +
            + if (context.isRestored()) {
            + if (sequenceNumsToRestore == null) {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + }

            +
            + LOG.info("Setting restore state in the FlinkKinesisConsumer.");
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Using the following offsets: {}", sequenceNumsToRestore);
            + }
            + } else if (sequenceNumsToRestore.isEmpty())

            { + sequenceNumsToRestore = null; + }

            + } else

            { + LOG.info("No restore state for FlinkKinesisConsumer."); }

            + }

          • lastStateSnapshot = fetcher.snapshotState();
            + @Override
            + public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
            + LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
            + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
            +
            + sequenceNumsToRestore = restoredState.isEmpty() ? null : restoredState;

          if (LOG.isDebugEnabled()) {

          • LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
          • lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
            + LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
            + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore);
              • End diff –

          Same here. I think this debug info can just be integrated with the previous info log message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112122515 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +293,84 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } + + offsetsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state ..."); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + } + + LOG.info("Setting restore state in the FlinkKinesisConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", sequenceNumsToRestore); + } + } else if (sequenceNumsToRestore.isEmpty()) { + sequenceNumsToRestore = null; + } + } else { + LOG.info("No restore state for FlinkKinesisConsumer."); } + } lastStateSnapshot = fetcher.snapshotState(); + @Override + public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { + LOG.info("{} (taskIdx={}) restoring offsets from an older version.", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); + + sequenceNumsToRestore = restoredState.isEmpty() ? null : restoredState; if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); + LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore); End diff – Same here. I think this debug info can just be integrated with the previous info log message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112123170

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -559,48 +699,298 @@ public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi

          @Test
          @SuppressWarnings("unchecked")
          + public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception {
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
          +
          + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
          + List<KinesisStreamShard> shards = new ArrayList<>();
          + shards.addAll(fakeRestoredState.keySet());
          + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
          + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
          +
          + // assume the given config is correct
          + PowerMockito.mockStatic(KinesisConfigUtil.class);
          + PowerMockito.doNothing().when(KinesisConfigUtil.class);
          +
          + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
          + "fakeStream", new Properties(), 10, 2);
          + consumer.restoreState(fakeRestoredState);
          + consumer.open(new Configuration());
          + consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
          +
          + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet())

          { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + }
          + }
          +
          + @Test
          + @SuppressWarnings("unchecked")
          public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
          + // ----------------------------------------------------------------------
          + // setting initial state
          + // ----------------------------------------------------------------------
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
          +
          + // ----------------------------------------------------------------------
          + // mock operator state backend and initial state for initializeState()
          + // ----------------------------------------------------------------------
          + TestingListState<Serializable> listState = new TestingListState<>();
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + }
          +
          + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
          + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
          +
          + StateInitializationContext initializationContext = mock(StateInitializationContext.class);
          + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
          + when(initializationContext.isRestored()).thenReturn(true);
          +
          + // ----------------------------------------------------------------------
          + // mock fetcher
          + // ----------------------------------------------------------------------
          KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
          + List<KinesisStreamShard> shards = new ArrayList<>();
          + shards.addAll(fakeRestoredState.keySet());
          + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
          PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);

          // assume the given config is correct
          PowerMockito.mockStatic(KinesisConfigUtil.class);
          PowerMockito.doNothing().when(KinesisConfigUtil.class);

          - HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream1",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream1",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream1",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream2",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream2",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          + // ----------------------------------------------------------------------
          + // start to test seed initial state to fetcher
          + // ----------------------------------------------------------------------
          + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
          + "fakeStream", new Properties(), 10, 2);
          + consumer.initializeState(initializationContext);
          + consumer.open(new Configuration());
          + consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
          +
          + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + }

          + }
          +
          + @Test
          + @SuppressWarnings("unchecked")
          + public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception {
          + // ----------------------------------------------------------------------
          + // setting initial state
          + // ----------------------------------------------------------------------
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
          +
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
          +
          + // ----------------------------------------------------------------------
          + // mock operator state backend and initial state for initializeState()
          + // ----------------------------------------------------------------------
          + TestingListState<Serializable> listState = new TestingListState<>();
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet())

          { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + }
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + }

          + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
          + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
          +
          + StateInitializationContext initializationContext = mock(StateInitializationContext.class);
          + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
          + when(initializationContext.isRestored()).thenReturn(true);
          +
          + // ----------------------------------------------------------------------
          + // mock fetcher
          + // ----------------------------------------------------------------------
          + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
          + List<KinesisStreamShard> shards = new ArrayList<>();
          + shards.addAll(fakeRestoredState.keySet());
          + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
          + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
          +
          + // assume the given config is correct
          + PowerMockito.mockStatic(KinesisConfigUtil.class);
          + PowerMockito.doNothing().when(KinesisConfigUtil.class);
          +
          + // ----------------------------------------------------------------------
          + // start to test seed initial state to fetcher
          + // ----------------------------------------------------------------------
          TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
          "fakeStream", new Properties(), 10, 2);

          • consumer.restoreState(fakeRestoredState);
            + consumer.initializeState(initializationContext);
            consumer.open(new Configuration());
            consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

          Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet())

          { + // should never get restored state not belonging to itself + Mockito.verify(mockedFetcher, never()).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + }

          for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet())

          { + // should get restored state belonging to itself Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); Mockito.verify(mockedFetcher).registerNewSubscribedShardState( new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); }

          }
          +
          + @Test
          + @SuppressWarnings("unchecked")
          + /*
          + * If the original parallelism is 2 and states is:
          + * Consumer subtask 1:
          + * stream1, shard1, SequentialNumber(xxx)
          + * Consumer subtask 2:
          + * stream1, shard2, SequentialNumber(yyy)
          + * After discoverNewShardsToSubscribe() if there are two shards (shard3, shard4) been created:
          + * Consumer subtask 1 (late for discoverNewShardsToSubscribe()):
          + * stream1, shard1, SequentialNumber(xxx)
          + * Consumer subtask 2:
          + * stream1, shard2, SequentialNumber(yyy)
          + * stream1, shard4, SequentialNumber(zzz)
          + * If snapshotState() occur and parallelism is changed to 1:
          + * Broadcast state will be:
          — End diff –

          "Union" state

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112123170 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -559,48 +699,298 @@ public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi @Test @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception { + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); + + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.restoreState(fakeRestoredState); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } + } + + @Test + @SuppressWarnings("unchecked") public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception { + // ---------------------------------------------------------------------- + // setting initial state + // ---------------------------------------------------------------------- + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + TestingListState<Serializable> listState = new TestingListState<>(); + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + // ---------------------------------------------------------------------- + // mock fetcher + // ---------------------------------------------------------------------- KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); // assume the given config is correct PowerMockito.mockStatic(KinesisConfigUtil.class); PowerMockito.doNothing().when(KinesisConfigUtil.class); - HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>(); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream2", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream2", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), - new SequenceNumber(UUID.randomUUID().toString())); + // ---------------------------------------------------------------------- + // start to test seed initial state to fetcher + // ---------------------------------------------------------------------- + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception { + // ---------------------------------------------------------------------- + // setting initial state + // ---------------------------------------------------------------------- + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1"); + + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2"); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + TestingListState<Serializable> listState = new TestingListState<>(); + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + // ---------------------------------------------------------------------- + // mock fetcher + // ---------------------------------------------------------------------- + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + // ---------------------------------------------------------------------- + // start to test seed initial state to fetcher + // ---------------------------------------------------------------------- TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( "fakeStream", new Properties(), 10, 2); consumer.restoreState(fakeRestoredState); + consumer.initializeState(initializationContext); consumer.open(new Configuration()); consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) { + // should never get restored state not belonging to itself + Mockito.verify(mockedFetcher, never()).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + // should get restored state belonging to itself Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); Mockito.verify(mockedFetcher).registerNewSubscribedShardState( new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); } } + + @Test + @SuppressWarnings("unchecked") + /* + * If the original parallelism is 2 and states is: + * Consumer subtask 1: + * stream1, shard1, SequentialNumber(xxx) + * Consumer subtask 2: + * stream1, shard2, SequentialNumber(yyy) + * After discoverNewShardsToSubscribe() if there are two shards (shard3, shard4) been created: + * Consumer subtask 1 (late for discoverNewShardsToSubscribe()): + * stream1, shard1, SequentialNumber(xxx) + * Consumer subtask 2: + * stream1, shard2, SequentialNumber(yyy) + * stream1, shard4, SequentialNumber(zzz) + * If snapshotState() occur and parallelism is changed to 1: + * Broadcast state will be: — End diff – "Union" state
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112121870

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +293,84 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + }

            else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }

            +
            + offsetsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();
            +
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
            +
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet())

            { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotting state ...");
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple));
              • End diff –

          We can probably have a more meaningful state name now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112121870 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +293,84 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } + + offsetsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state ..."); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple)); End diff – We can probably have a more meaningful state name now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112122336

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +293,84 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + }

            else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }

            +
            + offsetsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();
            +
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
            +
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet())

            { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotting state ...");
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple));
            +
            + if (context.isRestored()) {
            + if (sequenceNumsToRestore == null) {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) {
              • End diff –

          I would rename this to `kinesisSeqNum` for consistency, because "offset" is the Kafka term while "sequence number" is the Kinesis term

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112122336 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +293,84 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } + + offsetsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state ..."); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { End diff – I would rename this to `kinesisSeqNum` for consistency, because "offset" is the Kafka term while "sequence number" is the Kinesis term
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112122018

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +293,84 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + }

            else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }

            +
            + offsetsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();
            +
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
            +
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet())

            { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotting state ...");
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple));
            +
            + if (context.isRestored()) {
            + if (sequenceNumsToRestore == null) {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + }

            +
            + LOG.info("Setting restore state in the FlinkKinesisConsumer.");
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Using the following offsets: {}", sequenceNumsToRestore);
            + }

              • End diff –

          I would simply include `sequenceNumsToRestore` as part of the info log message.
          It's actually quite useful information for debugging sometimes, and users normally do not enable debug level for the production jobs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112122018 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +293,84 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } + + offsetsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state ..."); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + } + + LOG.info("Setting restore state in the FlinkKinesisConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", sequenceNumsToRestore); + } End diff – I would simply include `sequenceNumsToRestore` as part of the info log message. It's actually quite useful information for debugging sometimes, and users normally do not enable debug level for the production jobs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112122730

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -66,31 +80,48 @@
          // Consumer properties
          // ------------------------------------------------------------------------

          • /** The names of the Kinesis streams that we will be consuming from */
            + /**
            + * The names of the Kinesis streams that we will be consuming from
            + */
            private final List<String> streams;
          • /** Properties to parametrize settings such as AWS service region, initial position in stream,
          • * shard list retrieval behaviours, etc */
            + /**
            + * Properties to parametrize settings such as AWS service region, initial position in stream,
            + * shard list retrieval behaviours, etc
            + */
            private final Properties configProps;
          • /** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */
            + /**
            + * User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects
            + */
            private final KinesisDeserializationSchema<T> deserializer;

          // ------------------------------------------------------------------------
          // Runtime state
          // ------------------------------------------------------------------------

          • /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
            + /**
            + * Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards
            + */
            private transient KinesisDataFetcher<T> fetcher;
          • /** The sequence numbers in the last state snapshot of this subtask */
            + /**
            + * The sequence numbers in the last state snapshot of this subtask
            + */
            private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot;
          • /** The sequence numbers to restore to upon restore from failure */
            + /**
            + * The sequence numbers to restore to upon restore from failure
            + */
            private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;

          private volatile boolean running = true;

          + // ------------------------------------------------------------------------
          + // State for Checkpoint
          + // ------------------------------------------------------------------------
          +
          + private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>> offsetsStateForCheckpoint;
          — End diff –

          "offset" is the Kafka term.
          I would try to rename this to use "sequence number" instead (or a likewise abbreviation).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112122730 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -66,31 +80,48 @@ // Consumer properties // ------------------------------------------------------------------------ /** The names of the Kinesis streams that we will be consuming from */ + /** + * The names of the Kinesis streams that we will be consuming from + */ private final List<String> streams; /** Properties to parametrize settings such as AWS service region, initial position in stream, * shard list retrieval behaviours, etc */ + /** + * Properties to parametrize settings such as AWS service region, initial position in stream, + * shard list retrieval behaviours, etc + */ private final Properties configProps; /** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */ + /** + * User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects + */ private final KinesisDeserializationSchema<T> deserializer; // ------------------------------------------------------------------------ // Runtime state // ------------------------------------------------------------------------ /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */ + /** + * Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards + */ private transient KinesisDataFetcher<T> fetcher; /** The sequence numbers in the last state snapshot of this subtask */ + /** + * The sequence numbers in the last state snapshot of this subtask + */ private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot; /** The sequence numbers to restore to upon restore from failure */ + /** + * The sequence numbers to restore to upon restore from failure + */ private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore; private volatile boolean running = true; + // ------------------------------------------------------------------------ + // State for Checkpoint + // ------------------------------------------------------------------------ + + private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>> offsetsStateForCheckpoint; — End diff – "offset" is the Kafka term. I would try to rename this to use "sequence number" instead (or a likewise abbreviation).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112123765

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {

          • for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
            + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe();
            + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
            fetcher.advanceLastDiscoveredShardOfStream(
          • restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
            + shard.getStreamName(), shard.getShard().getShardId());
            +
            + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard)
            + ? lastStateSnapshot.get(shard)
            + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get();
              • End diff –

          Shouldn't the case always be `SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()` for newly discovered shards?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112123765 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { fetcher.advanceLastDiscoveredShardOfStream( restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); + shard.getStreamName(), shard.getShard().getShardId()); + + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard) + ? lastStateSnapshot.get(shard) + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(); End diff – Shouldn't the case always be `SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()` for newly discovered shards?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112123126

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -559,48 +699,298 @@ public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi

          @Test
          @SuppressWarnings("unchecked")
          + public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception {
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
          +
          + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
          + List<KinesisStreamShard> shards = new ArrayList<>();
          + shards.addAll(fakeRestoredState.keySet());
          + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
          + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
          +
          + // assume the given config is correct
          + PowerMockito.mockStatic(KinesisConfigUtil.class);
          + PowerMockito.doNothing().when(KinesisConfigUtil.class);
          +
          + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
          + "fakeStream", new Properties(), 10, 2);
          + consumer.restoreState(fakeRestoredState);
          + consumer.open(new Configuration());
          + consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
          +
          + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet())

          { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + }
          + }
          +
          + @Test
          + @SuppressWarnings("unchecked")
          public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
          + // ----------------------------------------------------------------------
          + // setting initial state
          + // ----------------------------------------------------------------------
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
          +
          + // ----------------------------------------------------------------------
          + // mock operator state backend and initial state for initializeState()
          + // ----------------------------------------------------------------------
          + TestingListState<Serializable> listState = new TestingListState<>();
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + }
          +
          + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
          + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
          +
          + StateInitializationContext initializationContext = mock(StateInitializationContext.class);
          + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
          + when(initializationContext.isRestored()).thenReturn(true);
          +
          + // ----------------------------------------------------------------------
          + // mock fetcher
          + // ----------------------------------------------------------------------
          KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
          + List<KinesisStreamShard> shards = new ArrayList<>();
          + shards.addAll(fakeRestoredState.keySet());
          + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
          PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);

          // assume the given config is correct
          PowerMockito.mockStatic(KinesisConfigUtil.class);
          PowerMockito.doNothing().when(KinesisConfigUtil.class);

          - HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream1",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream1",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream1",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream2",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          - fakeRestoredState.put(
          - new KinesisStreamShard("fakeStream2",
          - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
          - new SequenceNumber(UUID.randomUUID().toString()));
          + // ----------------------------------------------------------------------
          + // start to test seed initial state to fetcher
          + // ----------------------------------------------------------------------
          + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
          + "fakeStream", new Properties(), 10, 2);
          + consumer.initializeState(initializationContext);
          + consumer.open(new Configuration());
          + consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
          +
          + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + }

          + }
          +
          + @Test
          + @SuppressWarnings("unchecked")
          + public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception {
          + // ----------------------------------------------------------------------
          + // setting initial state
          + // ----------------------------------------------------------------------
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
          +
          + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
          +
          + // ----------------------------------------------------------------------
          + // mock operator state backend and initial state for initializeState()
          + // ----------------------------------------------------------------------
          + TestingListState<Serializable> listState = new TestingListState<>();
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet())

          { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + }
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + }

          + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
          + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
          +
          + StateInitializationContext initializationContext = mock(StateInitializationContext.class);
          + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
          + when(initializationContext.isRestored()).thenReturn(true);
          +
          + // ----------------------------------------------------------------------
          + // mock fetcher
          + // ----------------------------------------------------------------------
          + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
          + List<KinesisStreamShard> shards = new ArrayList<>();
          + shards.addAll(fakeRestoredState.keySet());
          + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
          + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
          +
          + // assume the given config is correct
          + PowerMockito.mockStatic(KinesisConfigUtil.class);
          + PowerMockito.doNothing().when(KinesisConfigUtil.class);
          +
          + // ----------------------------------------------------------------------
          + // start to test seed initial state to fetcher
          + // ----------------------------------------------------------------------
          TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
          "fakeStream", new Properties(), 10, 2);

          • consumer.restoreState(fakeRestoredState);
            + consumer.initializeState(initializationContext);
            consumer.open(new Configuration());
            consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

          Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
          + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet())

          { + // should never get restored state not belonging to itself + Mockito.verify(mockedFetcher, never()).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + }

          for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet())

          { + // should get restored state belonging to itself Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); Mockito.verify(mockedFetcher).registerNewSubscribedShardState( new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); }

          }
          +
          + @Test
          + @SuppressWarnings("unchecked")
          — End diff –

          Should place these annotations after the comment block. I think that's the usual convention.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112123126 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -559,48 +699,298 @@ public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi @Test @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception { + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); + + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.restoreState(fakeRestoredState); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } + } + + @Test + @SuppressWarnings("unchecked") public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception { + // ---------------------------------------------------------------------- + // setting initial state + // ---------------------------------------------------------------------- + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + TestingListState<Serializable> listState = new TestingListState<>(); + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + // ---------------------------------------------------------------------- + // mock fetcher + // ---------------------------------------------------------------------- KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); // assume the given config is correct PowerMockito.mockStatic(KinesisConfigUtil.class); PowerMockito.doNothing().when(KinesisConfigUtil.class); - HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>(); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream2", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream2", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), - new SequenceNumber(UUID.randomUUID().toString())); + // ---------------------------------------------------------------------- + // start to test seed initial state to fetcher + // ---------------------------------------------------------------------- + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception { + // ---------------------------------------------------------------------- + // setting initial state + // ---------------------------------------------------------------------- + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1"); + + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2"); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + TestingListState<Serializable> listState = new TestingListState<>(); + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + // ---------------------------------------------------------------------- + // mock fetcher + // ---------------------------------------------------------------------- + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + // ---------------------------------------------------------------------- + // start to test seed initial state to fetcher + // ---------------------------------------------------------------------- TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( "fakeStream", new Properties(), 10, 2); consumer.restoreState(fakeRestoredState); + consumer.initializeState(initializationContext); consumer.open(new Configuration()); consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) { + // should never get restored state not belonging to itself + Mockito.verify(mockedFetcher, never()).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + // should get restored state belonging to itself Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); Mockito.verify(mockedFetcher).registerNewSubscribedShardState( new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); } } + + @Test + @SuppressWarnings("unchecked") — End diff – Should place these annotations after the comment block. I think that's the usual convention.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112132994

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {
          — End diff –

          @tzulitai
          Because the behavior in `KinesisDataFetcher.discoverNewShardsToSubscribe()` will always discover new shards from the latest discovered shard id, that may cause the problem if some subtasks miss to discover new shards before rescaling and new subtasks also miss to discover those shards.

          You can get the concrete example from comment for `FlinkKinesisConsumerTest.testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112132994 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { — End diff – @tzulitai Because the behavior in `KinesisDataFetcher.discoverNewShardsToSubscribe()` will always discover new shards from the latest discovered shard id, that may cause the problem if some subtasks miss to discover new shards before rescaling and new subtasks also miss to discover those shards. You can get the concrete example from comment for `FlinkKinesisConsumerTest.testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112134112

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {

          • for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
            + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe();
            + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
            fetcher.advanceLastDiscoveredShardOfStream(
          • restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
            + shard.getStreamName(), shard.getShard().getShardId());
            +
            + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard)
            + ? lastStateSnapshot.get(shard)
            + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get();
              • End diff –

          I just refer the behavior when `KinesisDataFetcher` started consuming data from Kinesis. It always assign `SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()` for newly discovered shards.
          It's okey to discuss which initial sequence number is proper for new shards here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112134112 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { fetcher.advanceLastDiscoveredShardOfStream( restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); + shard.getStreamName(), shard.getShard().getShardId()); + + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard) + ? lastStateSnapshot.get(shard) + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(); End diff – I just refer the behavior when `KinesisDataFetcher` started consuming data from Kinesis. It always assign `SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()` for newly discovered shards. It's okey to discuss which initial sequence number is proper for new shards here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3001

          @tzulitai
          Have addressed the comments. Thank you.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 @tzulitai Have addressed the comments. Thank you.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802072

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +212,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {

          • for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
            + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe();
            + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
            fetcher.advanceLastDiscoveredShardOfStream(
          • restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
            + shard.getStreamName(), shard.getShard().getShardId());
              • End diff –

          Then I think this advancement is not required, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802072 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +212,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { fetcher.advanceLastDiscoveredShardOfStream( restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); + shard.getStreamName(), shard.getShard().getShardId()); End diff – Then I think this advancement is not required, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802344

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java —
          @@ -458,7 +458,7 @@ public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {

          • 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
          • that we have already seen before the next time this function is called
            */
          • private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
            + public List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
              • End diff –

          We would probably need a big refactor of the Kinesis code to be able to not expose this method, by separating concerns of partition discovery and record fetching. The exposure is not nice, but I think we have to do it for now ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802344 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java — @@ -458,7 +458,7 @@ public void advanceLastDiscoveredShardOfStream(String stream, String shardId) { 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards that we have already seen before the next time this function is called */ private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException { + public List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException { End diff – We would probably need a big refactor of the Kinesis code to be able to not expose this method, by separating concerns of partition discovery and record fetching. The exposure is not nice, but I think we have to do it for now ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802078

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +212,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {

          • for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
            + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe();
              • End diff –

          We should add a comment within the code on why we need this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802078 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +212,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe(); End diff – We should add a comment within the code on why we need this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802220

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +289,77 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - }

            + } else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }
          • if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - }

            + sequenceNumsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();

          • lastStateSnapshot = fetcher.snapshotState();
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
          • lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • return lastStateSnapshot;
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple));
            +
            + if (context.isRestored()) {
            + if (sequenceNumsToRestore == null) {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : sequenceNumsStateForCheckpoint.get()) {
              • End diff –

          "kinesisOffset" --> sequence number

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802220 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +289,77 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - } + sequenceNumsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); lastStateSnapshot = fetcher.snapshotState(); + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } return lastStateSnapshot; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : sequenceNumsStateForCheckpoint.get()) { End diff – "kinesisOffset" --> sequence number
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802181

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +289,77 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - }

            + } else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }
          • if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - }

            + sequenceNumsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();

          • lastStateSnapshot = fetcher.snapshotState();
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
          • lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • return lastStateSnapshot;
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple));
              • End diff –

          Can we change this state name string to be `static final String`? And have a comment that it cannot be changed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802181 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +289,77 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - } + sequenceNumsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); lastStateSnapshot = fetcher.snapshotState(); + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } return lastStateSnapshot; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple)); End diff – Can we change this state name string to be `static final String`? And have a comment that it cannot be changed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802168

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +289,77 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - }

            + } else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }
          • if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - }

            + sequenceNumsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();

          • lastStateSnapshot = fetcher.snapshotState();
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
          • lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • return lastStateSnapshot;
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
              • End diff –

          I would recommend to change the order of the method declarations of `initializeState` and `snapshotState` for a better read flow.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802168 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +289,77 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - } + sequenceNumsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); lastStateSnapshot = fetcher.snapshotState(); + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } return lastStateSnapshot; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { End diff – I would recommend to change the order of the method declarations of `initializeState` and `snapshotState` for a better read flow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802060

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {

          • for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
            + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe();
            + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
            fetcher.advanceLastDiscoveredShardOfStream(
          • restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
            + shard.getStreamName(), shard.getShard().getShardId());
            +
            + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard)
            + ? lastStateSnapshot.get(shard)
            + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get();
              • End diff –

          Okay, I understand now, thanks. I think in general this case is quite complicated to understand without comments + logs for the reader. I would suggest to add some

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802060 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +216,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { fetcher.advanceLastDiscoveredShardOfStream( restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); + shard.getStreamName(), shard.getShard().getShardId()); + + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard) + ? lastStateSnapshot.get(shard) + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(); End diff – Okay, I understand now, thanks. I think in general this case is quite complicated to understand without comments + logs for the reader. I would suggest to add some
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802386

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java —
          @@ -0,0 +1,285 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kinesis;
          +
          +import com.amazonaws.services.kinesis.model.Shard;
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.streaming.api.TimeCharacteristic;
          +import org.apache.flink.streaming.api.functions.source.SourceFunction;
          +import org.apache.flink.streaming.api.operators.StreamSource;
          +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
          +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
          +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
          +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
          +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
          +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
          +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
          +import org.junit.Test;
          +
          +import java.net.URL;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Properties;
          +
          +import static org.junit.Assert.assertNotEquals;
          +import static org.junit.Assert.assertEquals;
          +import static org.mockito.Mockito.mock;
          +
          +/**
          + * Tests for checking whether

          {@link FlinkKinesisConsumer} can restore from snapshots that were
          + * done using the Flink 1.1 {@link FlinkKinesisConsumer}

          .
          + *
          + * <p>For regenerating the binary snapshot file you have to run the commented out portion
          + * of each test on a checkout of the Flink 1.1 branch.
          + */
          +public class FlinkKinesisConsumerMigrationTest {
          +
          + @Test
          + public void testRestoreFromFlink11WithEmptyState() throws Exception

          { + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(testConfig); + + StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness<String> testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + // restore state from binary snapshot file using legacy method + testHarness.initializeStateFromLegacyCheckpoint( + getResourceFilename("kinesis-consumer-migration-test-flink1.1-snapshot-empty")); + testHarness.open(); + + // assert that no state was restored + assertEquals(null, consumerFunction.getRestoredState()); + + consumerOperator.close(); + consumerOperator.cancel(); + }

          +
          + @Test
          + public void testRestoreFromFlink11() throws Exception

          { + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(testConfig); + + StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = + new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness<String> testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + // restore state from binary snapshot file using legacy method + testHarness.initializeStateFromLegacyCheckpoint( + getResourceFilename("kinesis-consumer-migration-test-flink1.1-snapshot")); + testHarness.open(); + + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" + final HashMap<KinesisStreamShard, SequenceNumber> expectedState = new HashMap<>(); + expectedState.put(new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + new SequenceNumber("987654321")); + + // assert that state is correctly restored from legacy checkpoint + assertNotEquals(null, consumerFunction.getRestoredState()); + assertEquals(1, consumerFunction.getRestoredState().size()); + assertEquals(expectedState, consumerFunction.getRestoredState()); + + consumerOperator.close(); + consumerOperator.cancel(); + }

          +
          + // ------------------------------------------------------------------------
          +
          + private static String getResourceFilename(String filename) {
          + ClassLoader cl = FlinkKinesisConsumerMigrationTest.class.getClassLoader();
          + URL resource = cl.getResource(filename);
          + if (resource == null)

          { + throw new NullPointerException("Missing snapshot resource."); + }

          + return resource.getFile();
          + }
          +
          + private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {
          + private static final long serialVersionUID = 1L;
          +
          + @SuppressWarnings("unchecked")
          + DummyFlinkKafkaConsumer(Properties properties)

          { + super("test", mock(KinesisDeserializationSchema.class), properties); + }

          +
          + @Override
          + protected KinesisDataFetcher<T> createFetcher(List<String> streams,
          + SourceFunction.SourceContext<T> sourceContext,
          + RuntimeContext runtimeContext,
          + Properties configProps,
          + KinesisDeserializationSchema<T> deserializationSchema)

          { + return mock(KinesisDataFetcher.class); + }

          + }
          +}
          +
          +/*
          — End diff –

          I think this can be removed, because the legacy checkpoint binary is already maintained by git and should never disappear

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802386 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java — @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.junit.Test; + +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * Tests for checking whether {@link FlinkKinesisConsumer} can restore from snapshots that were + * done using the Flink 1.1 {@link FlinkKinesisConsumer} . + * + * <p>For regenerating the binary snapshot file you have to run the commented out portion + * of each test on a checkout of the Flink 1.1 branch. + */ +public class FlinkKinesisConsumerMigrationTest { + + @Test + public void testRestoreFromFlink11WithEmptyState() throws Exception { + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(testConfig); + + StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness<String> testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + // restore state from binary snapshot file using legacy method + testHarness.initializeStateFromLegacyCheckpoint( + getResourceFilename("kinesis-consumer-migration-test-flink1.1-snapshot-empty")); + testHarness.open(); + + // assert that no state was restored + assertEquals(null, consumerFunction.getRestoredState()); + + consumerOperator.close(); + consumerOperator.cancel(); + } + + @Test + public void testRestoreFromFlink11() throws Exception { + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(testConfig); + + StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = + new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness<String> testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + // restore state from binary snapshot file using legacy method + testHarness.initializeStateFromLegacyCheckpoint( + getResourceFilename("kinesis-consumer-migration-test-flink1.1-snapshot")); + testHarness.open(); + + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" + final HashMap<KinesisStreamShard, SequenceNumber> expectedState = new HashMap<>(); + expectedState.put(new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + new SequenceNumber("987654321")); + + // assert that state is correctly restored from legacy checkpoint + assertNotEquals(null, consumerFunction.getRestoredState()); + assertEquals(1, consumerFunction.getRestoredState().size()); + assertEquals(expectedState, consumerFunction.getRestoredState()); + + consumerOperator.close(); + consumerOperator.cancel(); + } + + // ------------------------------------------------------------------------ + + private static String getResourceFilename(String filename) { + ClassLoader cl = FlinkKinesisConsumerMigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unchecked") + DummyFlinkKafkaConsumer(Properties properties) { + super("test", mock(KinesisDeserializationSchema.class), properties); + } + + @Override + protected KinesisDataFetcher<T> createFetcher(List<String> streams, + SourceFunction.SourceContext<T> sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema<T> deserializationSchema) { + return mock(KinesisDataFetcher.class); + } + } +} + +/* — End diff – I think this can be removed, because the legacy checkpoint binary is already maintained by git and should never disappear
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802121

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -194,26 +212,30 @@ public void run(SourceContext<T> sourceContext) throws Exception {
          // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
          // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
          // can potentially have new shards to subscribe to later on

          • fetcher = new KinesisDataFetcher<>(
          • streams, sourceContext, getRuntimeContext(), configProps, deserializer);
            + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

          boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
          fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

          // if we are restoring from a checkpoint, we iterate over the restored
          // state and accordingly seed the fetcher with subscribed shards states
          if (isRestoringFromFailure) {

          • for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
            + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe();
            + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
            fetcher.advanceLastDiscoveredShardOfStream(
          • restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
            + shard.getStreamName(), shard.getShard().getShardId());
            +
            + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard)
            + ? lastStateSnapshot.get(shard)
            + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get();
          • if (LOG.isInfoEnabled()) {
            + if (LOG.isInfoEnabled() && lastStateSnapshot.containsKey(shard)) {
              • End diff –

          I think we can integrate the `lastStateSnapshot.containsKey(shard)` check with
          ```
          SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard)
          ? lastStateSnapshot.get(shard)
          : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get();
          ```

          and also add a log for the latter case when the seq num is `SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802121 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -194,26 +212,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { fetcher.advanceLastDiscoveredShardOfStream( restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); + shard.getStreamName(), shard.getShard().getShardId()); + + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard) + ? lastStateSnapshot.get(shard) + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(); if (LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled() && lastStateSnapshot.containsKey(shard)) { End diff – I think we can integrate the `lastStateSnapshot.containsKey(shard)` check with ``` SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard) ? lastStateSnapshot.get(shard) : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(); ``` and also add a log for the latter case when the seq num is `SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112802258

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -267,38 +289,77 @@ public void close() throws Exception {
          // ------------------------------------------------------------------------

          @Override

          • public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            + public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - }

            -

          • if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - }
            -
            - if (!running) { + }

            else if (!running)

            { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - }

            + } else {
            + if (LOG.isDebugEnabled())

            { + LOG.debug("Snapshotting state ..."); + }
          • if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - }

            + sequenceNumsStateForCheckpoint.clear();
            + lastStateSnapshot = fetcher.snapshotState();

          • lastStateSnapshot = fetcher.snapshotState();
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
            + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
            + }
          • if (LOG.isDebugEnabled()) {
          • LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
          • lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
            + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + }

            }
            + }

          • return lastStateSnapshot;
            + @Override
            + public void initializeState(FunctionInitializationContext context) throws Exception {
            + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>(
            + TypeInformation.of(KinesisStreamShard.class),
            + TypeInformation.of(SequenceNumber.class)
            + );
            +
            + sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
            + new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple));
            +
            + if (context.isRestored()) {
            + if (sequenceNumsToRestore == null) {
            + sequenceNumsToRestore = new HashMap<>();
            + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : sequenceNumsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + }

            +
            + LOG.info("Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}",
            + sequenceNumsToRestore);
            + } else if (sequenceNumsToRestore.isEmpty())

            { + sequenceNumsToRestore = null; + }

            + } else

            { + LOG.info("No restore state for FlinkKinesisConsumer."); + }

            }

          @Override
          public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {

          • sequenceNumsToRestore = restoredState;
            + LOG.info("Subtask {} restoring offsets from an older Flink version: {}",
            + getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore);
            +
            + sequenceNumsToRestore = restoredState.isEmpty() ? null : restoredState;
            + }
            +
            + protected KinesisDataFetcher<T> createFetcher(List<String> streams,
              • End diff –

          Maybe a comment that this is exposed for test mocks would be nice.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802258 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -267,38 +289,77 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - } + sequenceNumsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); lastStateSnapshot = fetcher.snapshotState(); + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } if (LOG.isDebugEnabled()) { LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } return lastStateSnapshot; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : sequenceNumsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + } + + LOG.info("Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}", + sequenceNumsToRestore); + } else if (sequenceNumsToRestore.isEmpty()) { + sequenceNumsToRestore = null; + } + } else { + LOG.info("No restore state for FlinkKinesisConsumer."); + } } @Override public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { sequenceNumsToRestore = restoredState; + LOG.info("Subtask {} restoring offsets from an older Flink version: {}", + getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore); + + sequenceNumsToRestore = restoredState.isEmpty() ? null : restoredState; + } + + protected KinesisDataFetcher<T> createFetcher(List<String> streams, End diff – Maybe a comment that this is exposed for test mocks would be nice.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3001#discussion_r112801654

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java —
          @@ -91,6 +104,11 @@

          private volatile boolean running = true;

          + // ------------------------------------------------------------------------
          + // State for Checkpoint
          — End diff –

          nit: missing space before "State"
          The other
          ```
          // ======
          // (space)...
          // ======
          ```

          comment blocks usually have an extra space before the actual block tag

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112801654 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java — @@ -91,6 +104,11 @@ private volatile boolean running = true; + // ------------------------------------------------------------------------ + // State for Checkpoint — End diff – nit: missing space before "State" The other ``` // ====== // (space)... // ====== ``` comment blocks usually have an extra space before the actual block tag
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3001

          @tzulitai

          Thanks for your reviewing and nice comments. I have already addressed them. =)

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 @tzulitai Thanks for your reviewing and nice comments. I have already addressed them. =)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3001

          @tony810430 Thanks for the update!
          I've did a final review + test run, this is good to merge.

          Merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 @tony810430 Thanks for the update! I've did a final review + test run, this is good to merge. Merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3001

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3001
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Merged for 1.3.0 with https://git-wip-us.apache.org/repos/asf/flink/commit/e5b65a7

          Thank you for the contribution Wei-Che Wei!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Merged for 1.3.0 with https://git-wip-us.apache.org/repos/asf/flink/commit/e5b65a7 Thank you for the contribution Wei-Che Wei !

            People

            • Assignee:
              tonywei Wei-Che Wei
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development