Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20097

Race conditions in InputChannel.ChannelStatePersister

    XMLWordPrintableJSON

Details

    Description

      In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() always update pendingCheckpointBarrierId, potentially overwriting newer id (or BARRIER_RECEIVED value) with an old one.

      For stopPersisting(), consider a case:

      1. Two consecutive UC barriers arrive at the same channel (1st being stale at some point)
      2. In RemoteInputChannel.onBuffer, netty thread updates pendingCheckpointBarrierId to BARRIER_RECEIVED
      3. Task thread processes the 1st barrier and triggers a checkpoint
        Task thread processes the 2nd barrier and aborts 1st checkpoint, calling stopPersisting() from UC controller and setting pendingCheckpointBarrierId to CHECKPOINT_COMPLETED
      4. Task thread starts 2nd checkpoint and calls startPersisting() setting pendingCheckpointBarrierId to 2
      5. now new buffers have a chance to be included in the 2nd checkpoint (though they belong to the next one)

       

      For pendingCheckpointBarrierId(), consider an input gate with two channels A and B and two barriers 1 and 2:

      1. Channel A receives both barriers, channel B receives nothing yet
      2. Task thread processes both barriers on A, eventually triggering 2nd checkpoint
      3. Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2)
      4. Channel B receives the 1st barrier and becomes BARRIER_RECEIVED
      5. No buffers in B between barriers 1 and 2 will be included in the checkpoint 
      6. Channel B receives the 2nd barrier which will eventually conclude the checkpoint

       

      I see a solution in doing an action only if passed checkpointId >= pendingCheckpointId. For that, a separate field will be needed to hold the status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it shouldn't be a problem.
       

      Attachments

        Issue Links

          Activity

            People

              roman Roman Khachatryan
              roman Roman Khachatryan
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: