Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20654

Unaligned checkpoint recovery may lead to corrupted data stream

    XMLWordPrintableJSON

    Details

    • Release Note:
      Hide
      Using unaligned checkpoints in Flink 1.12.0 combined with two/multiple inputs tasks or with union inputs for single input tasks can result in corrupted state.

      This can happen if a new checkpoint is triggered before recovery is fully completed. For state to be corrupted a task with two or more input gates must receive a checkpoint barrier exactly at the same time this tasks finishes recovering spilled in-flight data. In such case this new checkpoint can succeed, with corrupted/missing in-flight data, which will result in various deserialisation/corrupted data stream errors when someone attempts to recover from such corrupted checkpoint.

      Using unaligned checkpoints in Flink 1.12.1, a corruption may occur in the checkpoint following a declined checkpoint.

      A late barrier of a canceled checkpoint may lead to buffers being not written into the successive checkpoint, such that recovery is not possible. This happens, when the next checkpoint barrier arrives at a given operator before all previous barriers arrived, which can only happen after cancellation in unaligned checkpoints.
      Show
      Using unaligned checkpoints in Flink 1.12.0 combined with two/multiple inputs tasks or with union inputs for single input tasks can result in corrupted state. This can happen if a new checkpoint is triggered before recovery is fully completed. For state to be corrupted a task with two or more input gates must receive a checkpoint barrier exactly at the same time this tasks finishes recovering spilled in-flight data. In such case this new checkpoint can succeed, with corrupted/missing in-flight data, which will result in various deserialisation/corrupted data stream errors when someone attempts to recover from such corrupted checkpoint. Using unaligned checkpoints in Flink 1.12.1, a corruption may occur in the checkpoint following a declined checkpoint. A late barrier of a canceled checkpoint may lead to buffers being not written into the successive checkpoint, such that recovery is not possible. This happens, when the next checkpoint barrier arrives at a given operator before all previous barriers arrived, which can only happen after cancellation in unaligned checkpoints.

      Description

      Fix of FLINK-20433 shows potential corruption after recovery for all variations of UnalignedCheckpointITCase.

      To reproduce, run UCITCase a couple hundreds times. The issue showed for me in:

      • execute [Parallel union, p = 5]
      • execute [Parallel union, p = 10]
      • execute [Parallel cogroup, p = 5]
      • execute [parallel pipeline with remote channels, p = 5]
        with decreasing frequency.

      The issue manifests as one of the following issues:

      • stream corrupted exception
      • EOF exception
      • assertion failure in NUM_LOST or NUM_OUT_OF_ORDER
      • (for union) ArithmeticException overflow (because the number that should be [0;100000] has been mis-deserialized)

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                AHeise Arvid Heise
              • Votes:
                0 Vote for this issue
                Watchers:
                16 Start watching this issue

                Dates

                • Created:
                  Updated: