Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.12.2
-
None
-
None
Description
Hi~
I'm trying to use flink-state-processor-api to do state migrations by reading states from an existing savepoint, and writing them into a new savepoint after certain transformations.
However, the reading rate does not meet my expectation.
When I tried to tune RocksDB by enabling RocksDB native metrics, I found it did not work.
So I did some debug, I found when the job is running under a SavepointEnvironment, no RocksDBStatebackend configurations will be passed to RocksDBStateBackend.
The whole process is described as below (code demonstrated is under version release-1.12.2):
First, when org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is invoked:
// org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend private StateBackend createStateBackend() throws Exception { final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader()); return StateBackendLoader.fromApplicationOrConfigOrDefault( fromApplication, getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG);
getEnvironment() returns a SavepointEnvironment instance.
And then org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo is invoked, it returns a new org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo instance.
// org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { return new SavepointTaskManagerRuntimeInfo(getIOManager()); }
At last, org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration is invoked. It returns an empty configuration, which means all configurations will be lost.
// org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration @Override public Configuration getConfiguration() { return new Configuration(); }