StateBackends are assumed to be not-thread-safe and accessed from the task thread only.

      In ChangelogStateBackend, there are (more) async operations. In addition to the usual methods, task thread is needed for:

      • DFS writer: collect so far uploaded changes; handle upload results after completion
      • ChangelogKeyedStateBackend: combining state handles upon upload completion by writer
      • ChangelogKeyedStateBackend: materialization - take snapshot (sync phase); handle results of the async phase 

      Direct synchronization can be used instead, but executing ^^^ by the Task thread would simpilfy the code (and ilkely improve performance).

      The only way to do this is via MailboxExecutor (because task thread runs mail actions in a loop until shutdown).


      However, it is currently created in StreamTask and classes reside in flink-streaming-java. So one subtask is to change creation/lifecycle and move the classes. The location is flink-core (at least for interfaces) and flink-runtime/flink-core (for implementations).


      Another subtask is to actually expose Executor to state backends (can be extracted into a separate task).

      StateBackend.createKeyedStateBackend already has Environment/TaskStateManager argument which can be used.

      However, Environment

      1. is available to the user (via getContainingTask)
      2. has too wide scope (e.g. InputGates not needed in state backends)
      3. has too many responsibilities - also true for TaskStateManager which has e.g. reportIncompleteTaskStateSnapshots

      Probably, there is a better way to expose it.


      Note that MailboxExecutor will likely be used in future in other places like ProcessFunction.




            • Assignee:
              ym Yuan Mei
              roman Roman Khachatryan
            • Votes:
              0 Vote for this issue
              2 Start watching this issue


              • Created: