Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35396

DynamoDB Streams Consumer consumption data is out of order

    XMLWordPrintableJSON

Details

    Description

      When we use `FlinkDynamoDBStreamsConsumer` in `flink-connector-aws/flink-connector-kinesis` to consume dynamodb stream data, there is an out-of-order problem.
      The service exception log is as follows:

      2024-05-06 00:00:40,639 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 has discovered a new shard StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', shard='{ShardId: shardId-00000001714924828427-d73b6b68,
        ParentShardId: shardId-00000001714910797443-fb1d3b22,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber: 2958376400000000058201168012,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2807
      ......
      ......
      ......
      2024-05-06 00:00:46,729 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 has reached the end of subscribed shard: StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', shard='{ShardId: shardId-00000001714910797443-fb1d3b22,ParentShardId: shardId-00000001714897099372-17932b9a,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber: 2955440900000000051102788386,}}'}
      

      It looks like the failure process is:
      `2024-05-06 00:00:40,639` A new shard is discovered and new sub-shards are consumed immediately.(ShardId: shardId-00000001714924828427-d73b6b68).
      `2024-05-06 00:00:46,729` Consume the old parent shard:(ShardId: shardId-00000001714910797443-fb1d3b22)end.

      There was a gap of 6 seconds. In other words, before the data consumption of the parent shard has finished, the child shard has already started consuming data. This causes the data we read to be sent downstream out of order.

      https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L689-L740

      This is because the code immediately submits `ShardConsumer` to `shardConsumersExecutor` when `discoverNewShards` is created, and `shardConsumersExecutor` is created through Executors.newCachedThreadPool(), which does not limit the number of threads, causing new and old shards to be consumed at the same time , so data consumption is out of order?
      `flink-connector-kinesis` relies on `dynamodb-streams-kinesis-adapter` to subscribe to messages from dynamodb stream. But why does `dynamodb-streams-kinesis-adapter` directly consume data without similar problems?

      Attachments

        Activity

          People

            Unassigned Unassigned
            Suxing Lee Suxing Lee
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: