Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.14.0, 1.13.2
Description
FLINK-19463 introduced the separation of StateBackend and CheckpointStorage. Before that, both were included in the same interface implementation AbstractFileStateBackend. FsStateBackend was used as a default implementation pre-1.13.
pre-1.13 initialized the checkpoint directory when instantiating the state backend (see FsStateBackendFactory). Starting from 1.13 loading the CheckpointStorage is done by the CheckpointStorageLoader.load method that is called in various places:
- Savepoint Disposal (through Checkpoints.loadCheckpointStorage) where it only relies on the configuration passed in by the cluster configuration (no application checkpoint storage is passed)
- SchedulerBase initialization (through DefaultExecutionGraphBuilder) where it’s based on the cluster’s configuration but also the application configuration (i.e. the JobGraph’s setting) that would be considered if CheckpointConfig#configure would have the checkpoint storage included
- StreamTask on the TaskManager’s side where it’s based on the configuration passed in by the JobVertex for the application’s CheckpointStorage and the TaskManager’s configuration (coming from the session cluster) for the fallback CheckpointStorage
The issue is that we don't set the checkpoint directory in the CheckpointConfig. Hence, it's not going to get picked up as a job-related property. Flink always uses the fallback provided by the session cluster configuration.
Attachments
Issue Links
- is caused by
-
FLINK-19463 Disentangle StateBackends from Checkpointing
- Closed
- links to