StateBackends are assumed to be not-thread-safe and accessed from the task thread only.
In ChangelogStateBackend, there are (more) async operations. In addition to the usual methods, task thread is needed for:
- DFS writer: collect so far uploaded changes; handle upload results after completion
- ChangelogKeyedStateBackend: combining state handles upon upload completion by writer
- ChangelogKeyedStateBackend: materialization - take snapshot (sync phase); handle results of the async phase
Direct synchronization can be used instead, but executing ^^^ by the Task thread would simpilfy the code (and ilkely improve performance).
The only way to do this is via MailboxExecutor (because task thread runs mail actions in a loop until shutdown).
However, it is currently created in StreamTask and classes reside in flink-streaming-java. So one subtask is to change creation/lifecycle and move the classes. The location is flink-core (at least for interfaces) and flink-runtime/flink-core (for implementations).
Another subtask is to actually expose Executor to state backends (can be extracted into a separate task).
StateBackend.createKeyedStateBackend already has Environment/TaskStateManager argument which can be used.
- is available to the user (via getContainingTask)
- has too wide scope (e.g. InputGates not needed in state backends)
- has too many responsibilities - also true for TaskStateManager which has e.g. reportIncompleteTaskStateSnapshots
Probably, there is a better way to expose it.
Note that MailboxExecutor will likely be used in future in other places like ProcessFunction.