Details

    • Type: Sub-task
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:
      None

      Description

      Follow up for FLINK-4512.

      Store checkpoint meta data in checkpoint directory. That makes it simpler for users to track and clean up checkpoints manually, if they want to retain externalized checkpoints across cancellations and terminal failures.

      Every state backend needs to be able to provide a storage location for the checkpoint metadata. The memory state backend would hence not work with externalized checkpoints, unless one sets explicitly a parameter `setExternalizedCheckpointsLocation(uri)`.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2752

          FLINK-4814 [checkpointing] Use checkpoint directory for externalized checkpoints

          This change drops the checkpoint directory configuration key and instead uses the configured checkpoint directory of the used backend (for `FsStateBackend` and `RocksDBBackend`). For backends without a checkpoint directory like the `MemoryStateBackend`, you have to explicitly configure a checkpoint directory. Otherwise, the job submission will fail.

          The externalized checkpoints now use the `FsCheckpointOutputStream`, too. This makes the checkpoint layout very nice for externalized checkpoints, because you end up with the checkpoint meta data together with the actual checkpoint data:
          ```java
          :checkpointDir/:jobId/chk-:checkpointId/
          +- :uuid // data
          .
          +- :uuid // data
          +- savepoint-:uuid // meta data
          ```
          The checkpoint meta data and actual data is self-contained in a single directory.

          This also changes the target file for savepoint though currently. Before this change you get
          ```java
          :savepointDir/savepoint-:rand
          ```
          After this change you get
          ```java
          :savepointDir/:jobId/chk-:checkpointId/savepoint-:uuid
          ```
          Is this OK to change?

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 4814-external_checkpoint_config

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2752.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2752


          commit b5d99b8b70ffc5a61a0d3bae20777ed2893313f3
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-10-31T12:58:03Z

          FLINK-4814 [refactoring] Add prefix option to FsCheckpointOutputStreams

          • Allows to configure a prefix for generated file names
          • Add a method to delete the created checkpoint directory

          commit 7acbc970a72356eea49f603654f324dd8931eaf6
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-11-01T15:54:17Z

          FLINK-4814 [refactoring] Use FsStreamFactory and Path in SavepointStore

          • Use the FsStreamFactory instead of manually working with the FileSystem
          • Use Path instead of String for path arguments

          commit f35eb0f906d88fdb724a4b17b1983d1af4c99f96
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-11-01T16:51:02Z

          FLINK-4814 [checkpointing] Use checkpoint directory for externalized checkpoints

          • Removes the config key for the checkpoint directory
          • Use the backend checkpoint directory for externalized checkpoints (fs, rocksDB)
          • With the mem backend, manual configuration is required

          commit e55fb2ec5444003e114ba0ee90ca4b148c9f1d00
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-11-02T15:21:06Z

          FLINK-4814 [docs] Add docs about externalized checkpoints


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2752 FLINK-4814 [checkpointing] Use checkpoint directory for externalized checkpoints This change drops the checkpoint directory configuration key and instead uses the configured checkpoint directory of the used backend (for `FsStateBackend` and `RocksDBBackend`). For backends without a checkpoint directory like the `MemoryStateBackend`, you have to explicitly configure a checkpoint directory. Otherwise, the job submission will fail. The externalized checkpoints now use the `FsCheckpointOutputStream`, too. This makes the checkpoint layout very nice for externalized checkpoints, because you end up with the checkpoint meta data together with the actual checkpoint data: ```java :checkpointDir/:jobId/chk-:checkpointId/ +- :uuid // data . +- :uuid // data +- savepoint-:uuid // meta data ``` The checkpoint meta data and actual data is self-contained in a single directory. — This also changes the target file for savepoint though currently. Before this change you get ```java :savepointDir/savepoint-:rand ``` After this change you get ```java :savepointDir/:jobId/chk-:checkpointId/savepoint-:uuid ``` Is this OK to change? You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4814-external_checkpoint_config Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2752.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2752 commit b5d99b8b70ffc5a61a0d3bae20777ed2893313f3 Author: Ufuk Celebi <uce@apache.org> Date: 2016-10-31T12:58:03Z FLINK-4814 [refactoring] Add prefix option to FsCheckpointOutputStreams Allows to configure a prefix for generated file names Add a method to delete the created checkpoint directory commit 7acbc970a72356eea49f603654f324dd8931eaf6 Author: Ufuk Celebi <uce@apache.org> Date: 2016-11-01T15:54:17Z FLINK-4814 [refactoring] Use FsStreamFactory and Path in SavepointStore Use the FsStreamFactory instead of manually working with the FileSystem Use Path instead of String for path arguments commit f35eb0f906d88fdb724a4b17b1983d1af4c99f96 Author: Ufuk Celebi <uce@apache.org> Date: 2016-11-01T16:51:02Z FLINK-4814 [checkpointing] Use checkpoint directory for externalized checkpoints Removes the config key for the checkpoint directory Use the backend checkpoint directory for externalized checkpoints (fs, rocksDB) With the mem backend, manual configuration is required commit e55fb2ec5444003e114ba0ee90ca4b148c9f1d00 Author: Ufuk Celebi <uce@apache.org> Date: 2016-11-02T15:21:06Z FLINK-4814 [docs] Add docs about externalized checkpoints
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2752

          @StephanEwen Do you have time to look at this? This exposes the checkpoint directories via the state backend and let's the JobManager use that one for checkpoint directory configurations.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2752 @StephanEwen Do you have time to look at this? This exposes the checkpoint directories via the state backend and let's the JobManager use that one for checkpoint directory configurations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2752

          I think this is a very nice addition, but probably needs a bit refinement:

          • I heard from various users that they would like to have a config option for the checkpoint directory, over a configuration "in code"
          • I would suggest to actually retain the `state.checkpoints.dir` config parameter, and make every state backend respect it for externalized checkpoints
          • State backends that write to a file system (FsStateBackend, RocksDB, ...) would respect that parameter (we might even want to deprecate their specific checkpoint dir parameter)
          • State backends that do not write to files will be fine with this parameter being absent unless one chooses to enable externalized checkpoints
          • The "in code" options overrides the config option, if both are set.
          • The option to enable externalize checkpoints should be also present in the configuration.

          What would be nice is to merge the documentation, as far as it is applicable to the current state.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2752 I think this is a very nice addition, but probably needs a bit refinement: I heard from various users that they would like to have a config option for the checkpoint directory, over a configuration "in code" I would suggest to actually retain the `state.checkpoints.dir` config parameter, and make every state backend respect it for externalized checkpoints State backends that write to a file system (FsStateBackend, RocksDB, ...) would respect that parameter (we might even want to deprecate their specific checkpoint dir parameter) State backends that do not write to files will be fine with this parameter being absent unless one chooses to enable externalized checkpoints The "in code" options overrides the config option, if both are set. The option to enable externalize checkpoints should be also present in the configuration. What would be nice is to merge the documentation, as far as it is applicable to the current state.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2752

          OK, I agree that it's better to leave it as is for the release. With the upcoming feature freeze, I don't think there is enough time to properly address this. I will cherry pick the docs changes for the release and we can address this after 1.2.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2752 OK, I agree that it's better to leave it as is for the release. With the upcoming feature freeze, I don't think there is enough time to properly address this. I will cherry pick the docs changes for the release and we can address this after 1.2.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce closed the pull request at:

          https://github.com/apache/flink/pull/2752

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce closed the pull request at: https://github.com/apache/flink/pull/2752
          Hide
          elevy Elias Levy added a comment -

          Wondering what is the status of this? The current state, as pointed out in FLINK-5627, is quite odd. You can configure the checkpoint directory programmatically, but not the externalized checkpoint directory. Will it make it into 1.4?

          Show
          elevy Elias Levy added a comment - Wondering what is the status of this? The current state, as pointed out in FLINK-5627 , is quite odd. You can configure the checkpoint directory programmatically, but not the externalized checkpoint directory. Will it make it into 1.4?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Yes I think this is still a problem and I think it's part of a larger problem that I'm trying to address in this one-pager: https://docs.google.com/document/d/1Q8hkpl2VVXrbEaJkQNe88kuus0F5wFFiebub9hqrg1s/edit?usp=sharing

          I'm just laying out what I would like to have, no solutions yet, though. What do you think?

          Show
          aljoscha Aljoscha Krettek added a comment - Yes I think this is still a problem and I think it's part of a larger problem that I'm trying to address in this one-pager: https://docs.google.com/document/d/1Q8hkpl2VVXrbEaJkQNe88kuus0F5wFFiebub9hqrg1s/edit?usp=sharing I'm just laying out what I would like to have, no solutions yet, though. What do you think?
          Hide
          elevy Elias Levy added a comment -

          Yes please. That would be helpful. I would suggest multiple levels of overriding. E.g. the cluster config can provide defaults, as it does now for some things. You should be able to package a config file in your job's resources that override the cluster's defaults. You should also be able to pass the job a config file at run time to override values in both of those. And command line arguments and programmatically set values should override all everything else.

          It would be good for the job to print its configuration on start up to aid debugging of settings, like the Kafka producer/consumer do.

          Also, as I mentioned in the mailing list, the mini cluster used in the local execution environment doesn't load a config file, so there are things, like external checkpoints, that can't be tested at the moment from within an IDE.

          API-wise it could also be more consistent. At the moment you configure some settings via the stream environment (e.g. env.setStreamTimeCharacteristic()), others via env.getConfig.foo, more via env.getCheckpointConfig.bar, and some only via the config file.

          Show
          elevy Elias Levy added a comment - Yes please. That would be helpful. I would suggest multiple levels of overriding. E.g. the cluster config can provide defaults, as it does now for some things. You should be able to package a config file in your job's resources that override the cluster's defaults. You should also be able to pass the job a config file at run time to override values in both of those. And command line arguments and programmatically set values should override all everything else. It would be good for the job to print its configuration on start up to aid debugging of settings, like the Kafka producer/consumer do. Also, as I mentioned in the mailing list, the mini cluster used in the local execution environment doesn't load a config file, so there are things, like external checkpoints, that can't be tested at the moment from within an IDE. API-wise it could also be more consistent. At the moment you configure some settings via the stream environment (e.g. env.setStreamTimeCharacteristic() ), others via env.getConfig.foo , more via env.getCheckpointConfig.bar , and some only via the config file.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Yes, those are things I'm aware of and I also find them quite annoying.

          Show
          aljoscha Aljoscha Krettek added a comment - Yes, those are things I'm aware of and I also find them quite annoying.

            People

            • Assignee:
              Unassigned
              Reporter:
              uce Ufuk Celebi
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development