Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5075

Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      A user reported that when our Kinesis connector is used against Kinesalite (https://github.com/mhart/kinesalite), we're incorrectly determining already found shards as newly discovered:
      http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html

      I suspect the problem to be the mock Kinesis API implementations of Kinesalite doesn't completely match with the official AWS Kinesis behaviour.

        Issue Links

          Activity

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/f5f4f7a Resolved for release-1.1 via http://git-wip-us.apache.org/repos/asf/flink/commit/b9e6dcc
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2822

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2822
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2822

          Merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2822 Merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2822

          Makes sense.

          +1 to this!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2822 Makes sense. +1 to this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2822

          Thanks for the review @StephanEwen. I'm pretty sure this doesn't affect the normal Kinesis shard discovery. I'll give it some final tests before merging (would like to get this in before the next 1.1.4 RC).

          Yes, using `describeStream(streamName)` only will be problematic for users with large numbers of shards, because the whole list may not be able to be returned in a single call. So that's most likely not a solution we can consider.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2822 Thanks for the review @StephanEwen. I'm pretty sure this doesn't affect the normal Kinesis shard discovery. I'll give it some final tests before merging (would like to get this in before the next 1.1.4 RC). Yes, using `describeStream(streamName)` only will be problematic for users with large numbers of shards, because the whole list may not be able to be returned in a single call. So that's most likely not a solution we can consider.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2822#discussion_r89119126

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java —
          @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp

          • @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
          • @return the result of the describe stream operation
            */
          • private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException {
            + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
              • End diff –

          Only Kinesis does. Kinesalite incorrectly ignores `startShardId`.
          It's marked `Nullable` here, because on fresh startups this method will be called with a `null` as the start ID (on startup there will be no shard Id to start from).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2822#discussion_r89119126 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java — @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) @return the result of the describe stream operation */ private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { End diff – Only Kinesis does. Kinesalite incorrectly ignores `startShardId`. It's marked `Nullable` here, because on fresh startups this method will be called with a `null` as the start ID (on startup there will be no shard Id to start from).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2822#discussion_r89100942

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java —
          @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp

          • @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
          • @return the result of the describe stream operation
            */
          • private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException {
            + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
              • End diff –

          Does only Kinesalite supply the `startShardId` parameter?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2822#discussion_r89100942 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java — @@ -283,7 +285,7 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) @return the result of the describe stream operation */ private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { End diff – Does only Kinesalite supply the `startShardId` parameter?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2822

          I'm not entirely sure whether the fix is a good workaround, or whether we should really bother with Kinesalite's incorrect behaviour at all in our Kinesis connector.

          I've considered the alternative is to just use the `describeStream(streamName)` variant, always fetching the whole shard list and explicitly ruling out shards we've already seen. That'll make the code clean of such "workarounds", but will be problematic for Kinesis users whose shard count exceeds the largest number of shards the API can return.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2822 I'm not entirely sure whether the fix is a good workaround, or whether we should really bother with Kinesalite's incorrect behaviour at all in our Kinesis connector. I've considered the alternative is to just use the `describeStream(streamName)` variant, always fetching the whole shard list and explicitly ruling out shards we've already seen. That'll make the code clean of such "workarounds", but will be problematic for Kinesis users whose shard count exceeds the largest number of shards the API can return.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/2822

          FLINK-5075 [kinesis] Make Kinesis consumer fail-proof to incorrect Kinesalite API behaviour

          A user reported that when tested against Kinesalite (a widely used mock Kinesis implementation), the connector was incorrectly determining already seen shards as newly discovered.

          The problem was that the connector was using the API `describeStream(streamName, exclusiveStartIShardId)` to fetch shards of a stream after the provided `exclusiveStartShardId` (given as the last id of the latest shard we've already discovered), and Kinesalite behaves differently for this from the official Kinesis API.

          For example, if the current complete shard list is [shard-0, shard-1, shard-2, shard-3] for "test-stream",
          then `describeStream("test-stream", "shard-1")` should return: [shard-2, shard-3].
          Kinesalite, however, incorrectly returns the whole list.

          I've manually tested this change against Kinesalite, and shard discovery is working normally again.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-5075

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2822.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2822


          commit cbef7eabeae645a1cd3533d3274cdb7491b1a779
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-11-17T06:24:24Z

          FLINK-5075 [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2822 FLINK-5075 [kinesis] Make Kinesis consumer fail-proof to incorrect Kinesalite API behaviour A user reported that when tested against Kinesalite (a widely used mock Kinesis implementation), the connector was incorrectly determining already seen shards as newly discovered. The problem was that the connector was using the API `describeStream(streamName, exclusiveStartIShardId)` to fetch shards of a stream after the provided `exclusiveStartShardId` (given as the last id of the latest shard we've already discovered), and Kinesalite behaves differently for this from the official Kinesis API. For example, if the current complete shard list is [shard-0, shard-1, shard-2, shard-3] for "test-stream", then `describeStream("test-stream", "shard-1")` should return: [shard-2, shard-3] . Kinesalite, however, incorrectly returns the whole list. I've manually tested this change against Kinesalite, and shard discovery is working normally again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5075 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2822.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2822 commit cbef7eabeae645a1cd3533d3274cdb7491b1a779 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-11-17T06:24:24Z FLINK-5075 [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development