Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5820

Extend State Backend Abstraction to support Global Cleanup Hooks

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.2.0
    • 1.5.0
    • None

    Description

      The current state backend abstraction has the limitation that each piece of state is only meaningful in the context of its state handle. There is no possibility of a view onto "all state associated with checkpoint X".

      That causes several issues

      • State might not be cleaned up in the process of failures. When a TaskManager hands over a state handle to the JobManager and either of them has a failure, the state handle may be lost and state lingers.
      • State might also linger if a cleanup operation failed temporarily, and the checkpoint metadata was already disposed
      • State cleanup is more expensive than necessary in many cases. Each state handle is individually released. For large jobs, this means 1000s of release operations (typically file deletes) per checkpoint, which can be expensive on some file systems.
      • It is hard to guarantee cleanup of parent directories with the current architecture.

      The core changes proposed here are:

      1. Each job has one core StateBackend. In the future, operators may have different KeyedStateBackends and OperatorStateBackends to mix and match for example RocksDB storabe and in-memory storage.

      2. The JobManager needs to be aware of the StateBackend.

      3. Storing checkpoint metadata becomes responsibility of the state backend, not the "completed checkpoint store". The later only stores the pointers to the available latest checkpoints (either in process or in ZooKeeper).

      4. The StateBackend may optionally have a hook to drop all checkpointed state that belongs to only one specific checkpoint (shared state comes as part of incremental checkpointing).

      5. The StateBackend needs to have a hook to drop all checkpointed state up to a specific checkpoint (for all previously discarded checkpoints).

      6. In the future, this must support periodic cleanup hooks that track orphaned shared state from incremental checkpoints.

      For the FsStateBackend, which stores most of the checkpointes state currently (transitively for RocksDB as well), this means a re-structuring of the storage directories as follows:

      ../<flink-checkpoints>/job1-id/
                                    /shared/    <-- shared checkpoint data
                                    /chk-1/...  <-- data exclusive to checkpoint 1
                                    /chk-2/...  <-- data exclusive to checkpoint 2
                                    /chk-3/...  <-- data exclusive to checkpoint 3
      
      ../<flink-checkpoints>/job2-id/
                                    /shared/...
                                    /chk-1/...
                                    /chk-2/...
                                    /chk-3/...
      
      ../<flink-savepoints>/savepoint-1/savepoint-root
                                       /file-1-uid
                                       /file-2-uid
                                       /file-3-uid
                           /savepoint-2/savepoint-root
                                       /file-1-uid
                                       /file-2-uid
                                       /file-3-uid
      

      This is the umbrella issue for the individual steps needed to address this.

      Attachments

        Issue Links

          Activity

            People

              sewen Stephan Ewen
              sewen Stephan Ewen
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: