Details
-
Sub-task
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
Description
Currently, calls on the Context in the OperatorCoordinator go directly synchronously to the ExcutionGraph.
There are two critical problems are:
- It is common that the code in the OperatorCoordinator runs in a separate thread (for example, because it executes blocking operations). Calling the scheduler from another thread causes the Scheduler to crash (Assertion Error, violation of single threaded property)
- Calls on the ExecutionGraph are removed as part of removing the legacy scheduler. Certain calls do not work any more.
Problem Level 1:
The solution would be to pass in the scheduler and a main thread executor to interact with it.
However, to do that the scheduler needs to be created before the OperatorCoordinators are created. One could do that by creating the Coordinators lazily after the Scheduler.
Problem Level 2:
The Scheduler restores the savepoints as part of the scheduler creation, when the ExecutionGraph and the CheckpointCoordinator are created early in the constructor.
(Side note: That design is tricky in itself, because it means state is restored before the scheduler is even properly constructed.)
That means the OperatorCoordinator needs to exist (or an in placeholder component needs to exist) to accept the restored state.
That brings us to a cyclic dependency:
- OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
- Scheduler and MainThreadExecutor need constructed ExecutionGraph
- ExecutionGraph needs CheckpointCoordinator
- CheckpointCoordinator needs OperatorCoordinator
Breaking the Cycle
The only way we can do this is with a form of lazy initialization:
- We eagerly create the OperatorCoordinators so they exist for state restore
- We provide an uninitialized context to them
- When the Scheduler is started (after leadership is granted) we initialize the context with the (then readily constructed) Scheduler and MainThreadExecutor
Longer-term Solution
The longer term solution would require a major change in the Scheduler and CheckpointCoordinator setup. Something like this:
- Scheduler (and ExecutionGraph) are constructed first
- JobMaster waits for leadership
- Upon leader grant, Operator Coordinators are constructed and can reference the Scheduler and FencedMainThreadExecutor
- CheckpointCoordinator is constructed and references ExecutionGraph and OperatorCoordinators
- Savepoint or latest checkpoint is restored
The implementation of the current should try to couple parts as loosely as possible to make it easy to implement the above approach later.
Attachments
Issue Links
- links to