Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5053

Incremental / lightweight snapshots for checkpoints

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:
      None

      Description

      There is currently basically no difference between savepoints and checkpoints in Flink and both are created through exactly the same process.

      However, savepoints and checkpoints have a slightly different meaning which we should take into account to keep Flink efficient:

      • Savepoints are (typically infrequently) triggered by the user to create a state from which the application can be restarted, e.g. because Flink, some code, or the parallelism needs to be changed.
      • Checkpoints are (typically frequently) triggered by the System to allow for fast recovery in case of failure, but keeping the job/system unchanged.

      This means that savepoints and checkpoints can have different properties in that:

      • Savepoint should represent a state of the application, where characteristics of the job (e.g. parallelism) can be adjusted for the next restart. One example for things that savepoints need to be aware of are key-groups. Savepoints can potentially be a little more expensive than checkpoints, because they are usually created a lot less frequently through the user.
      • Checkpoints are frequently triggered by the system to allow for fast failure recovery. However, failure recovery leaves all characteristics of the job unchanged. This checkpoints do not have to be aware of those, e.g. think again of key groups. Checkpoints should run faster than creating savepoints, in particular it would be nice to have incremental checkpoints.

      For a first approach, I would suggest the following steps/changes:

      • In checkpoint coordination: differentiate between triggering checkpoints
        and savepoints. Introduce properties for checkpoints that describe their set of abilities, e.g. "is-key-group-aware", "is-incremental".
      • In state handle infrastructure: introduce state handles that reflect incremental checkpoints and drop full key-group awareness, i.e. covering folders instead of files and not having keygroup_id -> file/offset mapping, but keygroup_range -> folder?
      • Backend side: We should start with RocksDB by reintroducing something similar to semi-async snapshots, but using BackupableDBOptions::setShareTableFiles(true) and transferring only new incremental outputs to HDFS. Notice that using RocksDB's internal backup mechanism is giving up on the information about individual key-groups. But as explained above, this should be totally acceptable for checkpoints, while savepoints should use the key-group-aware fully async mode. Of course we also need to implement the ability to restore from both types of snapshots.

      One problem in the suggested approach is still that even checkpoints should support scale-down, in case that only a smaller number of instances is left available in a recovery case.

        Activity

        Hide
        xiaogang.shi Xiaogang Shi added a comment -

        I think it's better to use checkpoint instead of backup to perform incremental checkpointing of rocksdb. The checkpoint method will create hard links for all living files, without the need to copy files. Hence it can help reduce the time taken in the synchronous part.

        What do you think?

        Show
        xiaogang.shi Xiaogang Shi added a comment - I think it's better to use checkpoint instead of backup to perform incremental checkpointing of rocksdb. The checkpoint method will create hard links for all living files, without the need to copy files. Hence it can help reduce the time taken in the synchronous part. What do you think?
        Hide
        srichter Stefan Richter added a comment -

        That might very well be, I am still planning to take closer look at RocksDB's backup/checkpoint features anyways before I start working on this. Until now, this description is more like a rough outline of my planning and for discussion. But thanks for the hint!

        Show
        srichter Stefan Richter added a comment - That might very well be, I am still planning to take closer look at RocksDB's backup/checkpoint features anyways before I start working on this. Until now, this description is more like a rough outline of my planning and for discussion. But thanks for the hint!
        Hide
        xiaogang.shi Xiaogang Shi added a comment - - edited

        Stefan Richter Do you have a more detailed plan about incremental checkpoints?

        I think much more work is needed to make it. One big problem is the concurrent modification made by TaskExecutors and JobMaster.

        Currently, the state handlers as well as the snapshot data (the files on HDFS) are both deleted by the JobMasters. In incremental checkpoints, a file may be used in different checkpoints. The concurrent access to the files may lead to incorrect results. For example, JobMaster may delete a file which the TaskExecutor thought it's already on HDFS and did not copy the file onto HDFS.

        One method is to synchronize the access of JobMasters and TaskExecutors. Another solution, i think, is to let TaskExecutors delete these snapshot files. That way, all access to the snapshot data is made by TaskExecutors, hence avoiding the need of synchronization.

        Do you have any idea about this problem?

        Show
        xiaogang.shi Xiaogang Shi added a comment - - edited Stefan Richter Do you have a more detailed plan about incremental checkpoints? I think much more work is needed to make it. One big problem is the concurrent modification made by TaskExecutors and JobMaster. Currently, the state handlers as well as the snapshot data (the files on HDFS) are both deleted by the JobMasters. In incremental checkpoints, a file may be used in different checkpoints. The concurrent access to the files may lead to incorrect results. For example, JobMaster may delete a file which the TaskExecutor thought it's already on HDFS and did not copy the file onto HDFS. One method is to synchronize the access of JobMasters and TaskExecutors. Another solution, i think, is to let TaskExecutors delete these snapshot files. That way, all access to the snapshot data is made by TaskExecutors, hence avoiding the need of synchronization. Do you have any idea about this problem?
        Hide
        soniclavier Vishnu Viswanath added a comment -

        is this task complete, I see all the pull requests for the subtasks are merged. If the work is complete, I would like to try this out . Have been waiting for Incremental checkpoint for one of the tasks that I am working on.

        Show
        soniclavier Vishnu Viswanath added a comment - is this task complete, I see all the pull requests for the subtasks are merged. If the work is complete, I would like to try this out . Have been waiting for Incremental checkpoint for one of the tasks that I am working on.
        Hide
        srichter Stefan Richter added a comment -

        I have opened this jira to track everything that should still be fixed before the release: https://issues.apache.org/jira/browse/FLINK-6537. Most of the work is already done in https://github.com/apache/flink/pull/3870.

        Show
        srichter Stefan Richter added a comment - I have opened this jira to track everything that should still be fixed before the release: https://issues.apache.org/jira/browse/FLINK-6537 . Most of the work is already done in https://github.com/apache/flink/pull/3870 .

          People

          • Assignee:
            xiaogang.shi Xiaogang Shi
            Reporter:
            srichter Stefan Richter
          • Votes:
            1 Vote for this issue
            Watchers:
            16 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development