Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2491 Support Checkpoints After Tasks Finished
  3. FLINK-24068

CheckpointBarrierHandler may skip the markAlignmentStart for alignment -with-timeout checkpoint

    XMLWordPrintableJSON

Details

    Description

      04:51:11,727 [Flat Map -> Sink: Unnamed (10/12)#1] WARN org.apache.flink.runtime.taskmanager.Task [] - Flat Map -> Sink: Unnamed (10/12)#1 (0d965fd3c0de11dc7fb6793435cda8ba) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Alignment time is less than zero({}). Is the time monotonic? [-9223369873401849363]
       at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
       at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:181)
       at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.markAlignmentEnd(CheckpointBarrierHandler.java:177)
       at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:253)
       at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:240)
       at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
       at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
       at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
       at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
      

      This is caused by:

      1. SingleCheckpointBarrierHandler first receive a BarrierAnnouncement, which would call checkNewCheckpoint to reset the currentCheckpointId and the alignedChannels.
      2. SingleCheckpointBarrierHandler then received an EndOfPartition, which would add the channel to the alignedChannels.
      3. SingleCheckpointBarrierHandler then received a barrier, which would found that the alignedChannels is already not empty, then it would skip the markAlignmentStart.

      We might change 3 to judge if this is the first barrier to receive.

      Attachments

        Activity

          People

            gaoyunhaii Yun Gao
            gaoyunhaii Yun Gao
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: