Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.12.0
Description
In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() always update pendingCheckpointBarrierId, potentially overwriting newer id (or BARRIER_RECEIVED value) with an old one.
For stopPersisting(), consider a case:
- Two consecutive UC barriers arrive at the same channel (1st being stale at some point)
- In RemoteInputChannel.onBuffer, netty thread updates pendingCheckpointBarrierId to BARRIER_RECEIVED
- Task thread processes the 1st barrier and triggers a checkpoint
Task thread processes the 2nd barrier and aborts 1st checkpoint, calling stopPersisting() from UC controller and setting pendingCheckpointBarrierId to CHECKPOINT_COMPLETED - Task thread starts 2nd checkpoint and calls startPersisting() setting pendingCheckpointBarrierId to 2
- now new buffers have a chance to be included in the 2nd checkpoint (though they belong to the next one)
For pendingCheckpointBarrierId(), consider an input gate with two channels A and B and two barriers 1 and 2:
- Channel A receives both barriers, channel B receives nothing yet
- Task thread processes both barriers on A, eventually triggering 2nd checkpoint
- Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2)
- Channel B receives the 1st barrier and becomes BARRIER_RECEIVED
- No buffers in B between barriers 1 and 2 will be included in the checkpoint
- Channel B receives the 2nd barrier which will eventually conclude the checkpoint
I see a solution in doing an action only if passed checkpointId >= pendingCheckpointId. For that, a separate field will be needed to hold the status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it shouldn't be a problem.
Attachments
Issue Links
- is related to
-
FLINK-19681 Passively timeout alignment on the inputs
- Closed
-
FLINK-20103 Improve test coverage with chaos testing & side-by-side tests
- Open
-
FLINK-22232 Improve test coverage for network stack
- Closed
- links to