Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29512

Align SubtaskCommittableManager checkpointId with CheckpointCommittableManagerImpl checkpointId during recovery

    XMLWordPrintableJSON

Details

    Description

      Similar to the issue described in https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of committables, the subtaskCommittables checkpointId is set to always 1 https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193 while the holding CheckpointCommittableManager is initialized with the checkpointId that is written into state https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155 .

       

      This leads to that during a recovery, the post-commit topology will receive a committable summary with the recovered checkpoint id and multiple `CommittableWithLinage`s with the reset checkpointId causing orphaned `CommittableWithLinages` without a `CommittableSummary` failing the job.

      Attachments

        Issue Links

          Activity

            People

              fpaul Fabian Paul
              fpaul Fabian Paul
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: