Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10740 FLIP-27: Refactor Source Interface
  3. FLINK-17781

OperatorCoordinator Context must support calls from thread other than JobMaster Main Thread

    XMLWordPrintableJSON

Details

    Description

      Currently, calls on the Context in the OperatorCoordinator go directly synchronously to the ExcutionGraph.

      There are two critical problems are:

      • It is common that the code in the OperatorCoordinator runs in a separate thread (for example, because it executes blocking operations). Calling the scheduler from another thread causes the Scheduler to crash (Assertion Error, violation of single threaded property)
      • Calls on the ExecutionGraph are removed as part of removing the legacy scheduler. Certain calls do not work any more.

      Problem Level 1:

      The solution would be to pass in the scheduler and a main thread executor to interact with it.

      However, to do that the scheduler needs to be created before the OperatorCoordinators are created. One could do that by creating the Coordinators lazily after the Scheduler.

      Problem Level 2:

      The Scheduler restores the savepoints as part of the scheduler creation, when the ExecutionGraph and the CheckpointCoordinator are created early in the constructor.
      (Side note: That design is tricky in itself, because it means state is restored before the scheduler is even properly constructed.)

      That means the OperatorCoordinator needs to exist (or an in placeholder component needs to exist) to accept the restored state.

      That brings us to a cyclic dependency:

      • OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
      • Scheduler and MainThreadExecutor need constructed ExecutionGraph
      • ExecutionGraph needs CheckpointCoordinator
      • CheckpointCoordinator needs OperatorCoordinator

      Breaking the Cycle

      The only way we can do this is with a form of lazy initialization:

      • We eagerly create the OperatorCoordinators so they exist for state restore
      • We provide an uninitialized context to them
      • When the Scheduler is started (after leadership is granted) we initialize the context with the (then readily constructed) Scheduler and MainThreadExecutor

      Longer-term Solution

      The longer term solution would require a major change in the Scheduler and CheckpointCoordinator setup. Something like this:

      • Scheduler (and ExecutionGraph) are constructed first
      • JobMaster waits for leadership
      • Upon leader grant, Operator Coordinators are constructed and can reference the Scheduler and FencedMainThreadExecutor
      • CheckpointCoordinator is constructed and references ExecutionGraph and OperatorCoordinators
      • Savepoint or latest checkpoint is restored

      The implementation of the current should try to couple parts as loosely as possible to make it easy to implement the above approach later.

      Attachments

        Issue Links

          Activity

            People

              sewen Stephan Ewen
              sewen Stephan Ewen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: