Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.16.2, aws-connector-4.2.0
Description
When we use `FlinkDynamoDBStreamsConsumer` in `flink-connector-aws/flink-connector-kinesis` to consume dynamodb stream data, there is an out-of-order problem.
The service exception log is as follows:
2024-05-06 00:00:40,639 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 has discovered a new shard StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', shard='{ShardId: shardId-00000001714924828427-d73b6b68, ParentShardId: shardId-00000001714910797443-fb1d3b22,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber: 2958376400000000058201168012,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2807 ...... ...... ...... 2024-05-06 00:00:46,729 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 has reached the end of subscribed shard: StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', shard='{ShardId: shardId-00000001714910797443-fb1d3b22,ParentShardId: shardId-00000001714897099372-17932b9a,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber: 2955440900000000051102788386,}}'}
It looks like the failure process is:
`2024-05-06 00:00:40,639` A new shard is discovered and new sub-shards are consumed immediately.(ShardId: shardId-00000001714924828427-d73b6b68).
`2024-05-06 00:00:46,729` Consume the old parent shard:(ShardId: shardId-00000001714910797443-fb1d3b22)end.
There was a gap of 6 seconds. In other words, before the data consumption of the parent shard has finished, the child shard has already started consuming data. This causes the data we read to be sent downstream out of order.
This is because the code immediately submits `ShardConsumer` to `shardConsumersExecutor` when `discoverNewShards` is created, and `shardConsumersExecutor` is created through Executors.newCachedThreadPool(), which does not limit the number of threads, causing new and old shards to be consumed at the same time , so data consumption is out of order?
`flink-connector-kinesis` relies on `dynamodb-streams-kinesis-adapter` to subscribe to messages from dynamodb stream. But why does `dynamodb-streams-kinesis-adapter` directly consume data without similar problems?