Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25400

RocksDBStateBackend configurations does not work with SavepointEnvironment

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.12.2
    • None
    • API / State Processor
    • None

    Description

      Hi~

      I'm trying to use flink-state-processor-api to do state migrations by reading states from an existing savepoint, and writing them into a new savepoint after certain transformations.

      However, the reading rate does not  meet my expectation.

      When I tried to tune RocksDB by enabling RocksDB native metrics, I found it did not work.

      So I did some debug, I found when the job is running under a SavepointEnvironment, no RocksDBStatebackend configurations will be passed to RocksDBStateBackend.

      The whole process is described as below (code demonstrated is under version release-1.12.2):

      First, when org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is invoked:

       

      // org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
      private StateBackend createStateBackend() throws Exception {
          final StateBackend fromApplication =
                  configuration.getStateBackend(getUserCodeClassLoader());
      
          return StateBackendLoader.fromApplicationOrConfigOrDefault(
                  fromApplication,
                  getEnvironment().getTaskManagerInfo().getConfiguration(),
                  getUserCodeClassLoader(),
                  LOG); 

      getEnvironment() returns a SavepointEnvironment instance.

       

      And then org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo is invoked, it returns a new org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo instance.

       

      // org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
      @Override
      public TaskManagerRuntimeInfo getTaskManagerInfo() {
          return new SavepointTaskManagerRuntimeInfo(getIOManager());
      } 

       

      At last, org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration is invoked. It returns an empty configuration, which means all configurations will be lost.

      // org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration
      @Override
      public Configuration getConfiguration() {
          return new Configuration();
      } 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            wuzhiyu wuzhiyu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: