Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10429

Redesign Flink Scheduling, introducing dedicated Scheduler component

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Done
    • 1.7.0
    • None
    • Runtime / Coordination
    • None

    Description

      This epic tracks the redesign of scheduling in Flink. Scheduling is currently a concern that is scattered across different components, mainly the ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on the granularity of individual tasks, which make holistic scheduling strategies hard to implement. In this epic we aim to introduce a dedicated Scheduler component that can support use-case like auto-scaling, local-recovery, and resource optimized batch.

      The design for this feature is developed here: https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing

      Attachments

        Issue Links

          1.
          Make access to ExecutionGraph single threaded from JobMaster main thread Sub-task Closed Stefan Richter
          2.
          Extract scheduling-related code from Executions Sub-task Closed Stefan Richter  
          3.
          Extract scheduling-related code from SlotPool Sub-task Closed Stefan Richter
          4.
          Introduce bulk/group-aware scheduling Sub-task Closed Stefan Richter  
          5.
          Stepwise creation of the ExecutionGraph sub-structures Sub-task Closed Stefan Richter  
          6.
          Introduce SchedulingStrategy interface Sub-task Closed shuai.xu
          7.
          Implement Eager Scheduling Strategy Sub-task Closed shuai.xu
          8.
          Implement Lazy Scheduling Strategy Sub-task Closed BoWang
          9.
          Remove getExecutionGraph() from JobMaster Sub-task Closed Gary Yao
          10.
          Introduce Scheduler interface and adapt ExecutionGraph to it Sub-task Closed Gary Yao
          11.
          Temporarily disable CLI command for rescaling Sub-task Closed Gary Yao
          12.
          Add CLI command for rescaling Sub-task Closed Unassigned  
          13.
          Implement ExecutionSlotAllocator Sub-task Closed shuai.xu
          14.
          Implement ExecutionFailureHandler Sub-task Closed Zhu Zhu
          15.
          Implement ExecutionGraph to SchedulingTopology Adapter Sub-task Closed BoWang
          16.
          Add SchedulerNG Stub Implementation Sub-task Closed Gary Yao
          17.
          Implement DefaultScheduler stub Sub-task Closed Gary Yao
          18.
          Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology Sub-task Closed BoWang
          19.
          Implement FixedDelayRestartBackoffTimeStrategy Sub-task Closed BoWang
          20.
          Implement FailureRateRestartBackoffTimeStrategy Sub-task Closed BoWang
          21.
          Implement RestartBackoffTimeStrategyFactoryLoader Sub-task Closed Zhu Zhu
          22.
          Implement ExecutionGraph to InputsLocationsRetriever Adapter Sub-task Closed shuai.xu
          23.
          Facilitate enabling new Scheduler in MiniCluster Tests Sub-task Closed Zhu Zhu
          24.
          Enable NG scheduler testing in per-commit tests Sub-task Closed Zhu Zhu
          25.
          Rewrite DefaultExecutionSlotAllocator to use SlotProviderStrategy Sub-task Closed Gary Yao
          26.
          All task state changes should be notified to SchedulingStrategy (SchedulerNG) Sub-task Closed Unassigned  
          27.
          Remove the description of restart strategy customization Sub-task Closed Zhu Zhu
          28.
          Support configurable failover strategy for scheduler NG Sub-task Closed Zhu Zhu  
          29.
          Unify SchedulerOperations#allocateSlotsAndDeploy implementation for all scheduling strategies Sub-task Closed Zhu Zhu
          30.
          Support global failure handling for DefaultScheduler (SchedulerNG) Sub-task Closed Zhu Zhu
          31.
          All partition consumable events should be notified to SchedulingStrategy (SchedulerNG) Sub-task Closed Zhu Zhu
          32.
          Make LazyFromSourcesSchedulingStrategy do lazy scheduling based on partition state only Sub-task Closed Unassigned  
          33.
          Use correct RestartBackoffTimeStrategy in new DefaultScheduler Sub-task Closed Gary Yao
          34.
          Annotate all MiniCluster tests in flink-runtime with AlsoRunWithSchedulerNG Sub-task Closed Zhu Zhu
          35.
          Use NoResourceAvailableException to wrap TimeoutException on slot allocation (Scheduler NG) Sub-task Closed Zhu Zhu
          36.
          LazyFromSourcesSchedulingStrategy should be able to restart terminated tasks Sub-task Closed Zhu Zhu
          37.
          Add unit tests for DefaultScheduler to test concurrent failover behavior Sub-task Closed Gary Yao
          38.
          Remove legacy RestartPipelinedRegionStrategy Sub-task Closed Zhu Zhu
          39.
          Get ExecutionVertexID from ExecutionVertex rather than creating new instances Sub-task Closed Zhu Zhu
          40.
          Introduce a unified topology interface Sub-task Resolved Zhu Zhu
          41.
          Change DefaultSchedulingResultPartition to return correct partition state Sub-task Closed Zhu Zhu
          42.
          Reset vertices right after they transition to terminated states Sub-task Closed Zhu Zhu
          43.
          Prevent vertex from being affected by outdated deployment (SchedulerNG) Sub-task Closed Zhu Zhu
          44.
          Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG Sub-task Resolved Zhu Zhu
          45.
          Annotate MiniCluster tests in flink-tests with AlsoRunWithSchedulerNG Sub-task Resolved Zhu Zhu
          46.
          Enable ClassLoaderITCase to pass with scheduler NG Sub-task Closed Zhu Zhu
          47.
          Enable KeyedStateCheckpointingITCase to pass with scheduler NG Sub-task Closed Gary Yao
          48.
          Enable ZooKeeperHighAvailabilityITCase to pass with scheduler NG Sub-task Closed Zhu Zhu
          49.
          Enable RegionFailoverITCase to pass with scheduler NG Sub-task Closed Gary Yao
          50.
          Avoid to notify ineffective state updates to scheduler Sub-task Closed Zhu Zhu
          51.
          Restore task state in new DefaultScheduler Sub-task Closed Zhu Zhu
          52.
          RestartPipelinedRegionStrategy leverage tracked partition availability for better failover experience in DefaultScheduler Sub-task Closed Zhu Zhu
          53.
          Enable BatchFineGrainedRecoveryITCase to pass with scheduler NG Sub-task Closed Zhu Zhu
          54.
          Refactor SchedulingTopology to extend base topology Sub-task Resolved Zhu Zhu
          55.
          Refactor FailoverTopology to extend base topology Sub-task Resolved Zhu Zhu
          56.
          Keep only one execution topology in scheduler Sub-task Resolved Zhu Zhu
          57.
          Enable AbstractTaskManagerProcessFailureRecoveryTest to pass with new DefaultScheduler Sub-task Closed Zhu Zhu  
          58.
          Support building pipelined regions from base topology Sub-task Resolved Zhu Zhu
          59.
          Add a metric to show failover count regarding fine grained recovery Sub-task Closed Zhu Zhu
          60.
          Simplify params of Execution#processFail Sub-task Closed Unassigned  
          61.
          Enable YARNHighAvailabilityITCase to pass with new DefaultScheduler Sub-task Closed Gary Yao  
          62.
          Handle schedule mode LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST correctly in DefaultScheduler Sub-task Closed Gary Yao
          63.
          Enable KafkaConsumerTestBase#runFailOnNoBrokerTest to pass with new DefaultScheduler Sub-task Closed Gary Yao
          64.
          Introduce full restarts failover strategy for NG scheduler Sub-task Closed Zhu Zhu
          65.
          Avoid leaking instance of DefaultScheduler before object is constructed Sub-task Closed Gary Yao
          66.
          Enable 'Queryable state (rocksdb) with TM restart' E2E test to pass with new DefaultScheduler Sub-task Closed Gary Yao
          67.
          Enable 'Streaming File Sink end-to-end test' to pass with new DefaultScheduler Sub-task Closed Gary Yao
          68.
          Enable 'Streaming bucketing end-to-end test' to pass with new DefaultScheduler Sub-task Closed Gary Yao
          69.
          Avoid leaking unassigned Slot in DefaultScheduler when Deployment is outdated Sub-task Closed Gary Yao
          70.
          Let tasks in a batch get scheduled in topological order and subtaskIndex ascending pattern Sub-task Closed Zhu Zhu
          71.
          Set default value of config option jobmanager.scheduler to "ng" Sub-task Closed Gary Yao
          72.
          SchedulerBase should only log the RestartStrategy in legacy scheduling mode Sub-task Closed Gary Yao

          Activity

            People

              Unassigned Unassigned
              srichter Stefan Richter
              Votes:
              3 Vote for this issue
              Watchers:
              48 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 23h 10m
                  23h 10m