Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24506

checkpoint directory is not configurable through the Flink configuration passed into the StreamExecutionEnvironment

    XMLWordPrintableJSON

Details

    • Hide
      FLINK-19463 introduced a change in Flink 1.13+ where the configuration passed in through the StreamExecutionEnvironment wouldn't be considered for the state.checkpoints.dir parameter. This is fixed now bringing back the old behavior which was available in 1.12-.
      Show
      FLINK-19463 introduced a change in Flink 1.13+ where the configuration passed in through the StreamExecutionEnvironment wouldn't be considered for the state.checkpoints.dir parameter. This is fixed now bringing back the old behavior which was available in 1.12-.

    Description

      FLINK-19463 introduced the separation of StateBackend and CheckpointStorage. Before that, both were included in the same interface implementation AbstractFileStateBackend. FsStateBackend was used as a default implementation pre-1.13.

      pre-1.13 initialized the checkpoint directory when instantiating the state backend (see FsStateBackendFactory). Starting from 1.13 loading the CheckpointStorage is done by the CheckpointStorageLoader.load method that is called in various places:

      • Savepoint Disposal (through Checkpoints.loadCheckpointStorage) where it only relies on the configuration passed in by the cluster configuration (no application checkpoint storage is passed)
      • SchedulerBase initialization (through DefaultExecutionGraphBuilder) where it’s based on the cluster’s configuration but also the application configuration (i.e. the JobGraph’s setting) that would be considered if CheckpointConfig#configure would have the checkpoint storage included
      • StreamTask on the TaskManager’s side where it’s based on the configuration passed in by the JobVertex for the application’s CheckpointStorage and the TaskManager’s configuration (coming from the session cluster) for the fallback CheckpointStorage

      The issue is that we don't set the checkpoint directory in the CheckpointConfig. Hence, it's not going to get picked up as a job-related property. Flink always uses the fallback provided by the session cluster configuration.

      Attachments

        Issue Links

          Activity

            People

              mapohl Matthias Pohl
              mapohl Matthias Pohl
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: