Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.15.1, 1.16.0, 1.17.0
Description
Similar to the issue described in https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of committables, the subtaskCommittables checkpointId is set to always 1 https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193 while the holding CheckpointCommittableManager is initialized with the checkpointId that is written into state https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155 .
This leads to that during a recovery, the post-commit topology will receive a committable summary with the recovered checkpoint id and multiple `CommittableWithLinage`s with the reset checkpointId causing orphaned `CommittableWithLinages` without a `CommittableSummary` failing the job.
Attachments
Issue Links
- relates to
-
FLINK-29459 Sink v2 has bugs in supporting legacy v1 implementations with global committer
- Open
- links to