FLINK-23139 adds private state management capabilities to TM.
However, it does not consider multiple retained checkpoints.
In most cases, it should work correctly:
- TMs will not discard the state of the previous checkpoints if it's not used in the latest one - becase they are not aware of it
- If some state is reused (incremental checkpoints); it will be discarded by TMs on latest checkpoint subsumption - which means that the previous checkpoints are subsumed too
However, JM will also try to discard the state on subsumption (it's not shared between TMs).
So, the state will be removed, it will not be removed prematurely, but there can be race conditions.
The simplest way to solve this is to ignore (log) discard errors.
Other options include:
- treat all state after recovery as "distributed", so TMs won't discard it
- compute the intersection between the checkpoints and pass it to TMs as distributed state, so they won't discard it
- compute the intersection between the checkpoints and prevent JM from discarding it
- pass all recovered snapshots from JM to TM on recovery