Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10429

Redesign Flink Scheduling, introducing dedicated Scheduler component

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Done
    • 1.7.0
    • None
    • Runtime / Coordination
    • None

    Description

      This epic tracks the redesign of scheduling in Flink. Scheduling is currently a concern that is scattered across different components, mainly the ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on the granularity of individual tasks, which make holistic scheduling strategies hard to implement. In this epic we aim to introduce a dedicated Scheduler component that can support use-case like auto-scaling, local-recovery, and resource optimized batch.

      The design for this feature is developed here: https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing

      Attachments

        Issue Links

        1.
        Make access to ExecutionGraph single threaded from JobMaster main thread Sub-task Closed Stefan Richter Actions
        2.
        Extract scheduling-related code from Executions Sub-task Closed Stefan Richter   Actions
        3.
        Extract scheduling-related code from SlotPool Sub-task Closed Stefan Richter Actions
        4.
        Introduce bulk/group-aware scheduling Sub-task Closed Stefan Richter   Actions
        5.
        Stepwise creation of the ExecutionGraph sub-structures Sub-task Closed Stefan Richter   Actions
        6.
        Introduce SchedulingStrategy interface Sub-task Closed shuai.xu Actions
        7.
        Implement Eager Scheduling Strategy Sub-task Closed shuai.xu Actions
        8.
        Implement Lazy Scheduling Strategy Sub-task Closed BoWang Actions
        9.
        Remove getExecutionGraph() from JobMaster Sub-task Closed Gary Yao Actions
        10.
        Introduce Scheduler interface and adapt ExecutionGraph to it Sub-task Closed Gary Yao Actions
        11.
        Temporarily disable CLI command for rescaling Sub-task Closed Gary Yao Actions
        12.
        Add CLI command for rescaling Sub-task Closed Unassigned   Actions
        13.
        Implement ExecutionSlotAllocator Sub-task Closed shuai.xu Actions
        14.
        Implement ExecutionFailureHandler Sub-task Closed Zhu Zhu Actions
        15.
        Implement ExecutionGraph to SchedulingTopology Adapter Sub-task Closed BoWang Actions
        16.
        Add SchedulerNG Stub Implementation Sub-task Closed Gary Yao Actions
        17.
        Implement DefaultScheduler stub Sub-task Closed Gary Yao Actions
        18.
        Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology Sub-task Closed BoWang Actions
        19.
        Implement FixedDelayRestartBackoffTimeStrategy Sub-task Closed BoWang Actions
        20.
        Implement FailureRateRestartBackoffTimeStrategy Sub-task Closed BoWang Actions
        21.
        Implement RestartBackoffTimeStrategyFactoryLoader Sub-task Closed Zhu Zhu Actions
        22.
        Implement ExecutionGraph to InputsLocationsRetriever Adapter Sub-task Closed shuai.xu Actions
        23.
        Facilitate enabling new Scheduler in MiniCluster Tests Sub-task Closed Zhu Zhu Actions
        24.
        Enable NG scheduler testing in per-commit tests Sub-task Closed Zhu Zhu Actions
        25.
        Rewrite DefaultExecutionSlotAllocator to use SlotProviderStrategy Sub-task Closed Gary Yao Actions
        26.
        All task state changes should be notified to SchedulingStrategy (SchedulerNG) Sub-task Closed Unassigned   Actions
        27.
        Remove the description of restart strategy customization Sub-task Closed Zhu Zhu Actions
        28.
        Support configurable failover strategy for scheduler NG Sub-task Closed Zhu Zhu   Actions
        29.
        Unify SchedulerOperations#allocateSlotsAndDeploy implementation for all scheduling strategies Sub-task Closed Zhu Zhu Actions
        30.
        Support global failure handling for DefaultScheduler (SchedulerNG) Sub-task Closed Zhu Zhu Actions
        31.
        All partition consumable events should be notified to SchedulingStrategy (SchedulerNG) Sub-task Closed Zhu Zhu Actions
        32.
        Make LazyFromSourcesSchedulingStrategy do lazy scheduling based on partition state only Sub-task Closed Unassigned   Actions
        33.
        Use correct RestartBackoffTimeStrategy in new DefaultScheduler Sub-task Closed Gary Yao Actions
        34.
        Annotate all MiniCluster tests in flink-runtime with AlsoRunWithSchedulerNG Sub-task Closed Zhu Zhu Actions
        35.
        Use NoResourceAvailableException to wrap TimeoutException on slot allocation (Scheduler NG) Sub-task Closed Zhu Zhu Actions
        36.
        LazyFromSourcesSchedulingStrategy should be able to restart terminated tasks Sub-task Closed Zhu Zhu Actions
        37.
        Add unit tests for DefaultScheduler to test concurrent failover behavior Sub-task Closed Gary Yao Actions
        38.
        Remove legacy RestartPipelinedRegionStrategy Sub-task Closed Zhu Zhu Actions
        39.
        Get ExecutionVertexID from ExecutionVertex rather than creating new instances Sub-task Closed Zhu Zhu Actions
        40.
        Introduce a unified topology interface Sub-task Resolved Zhu Zhu Actions
        41.
        Change DefaultSchedulingResultPartition to return correct partition state Sub-task Closed Zhu Zhu Actions
        42.
        Reset vertices right after they transition to terminated states Sub-task Closed Zhu Zhu Actions
        43.
        Prevent vertex from being affected by outdated deployment (SchedulerNG) Sub-task Closed Zhu Zhu Actions
        44.
        Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG Sub-task Resolved Zhu Zhu Actions
        45.
        Annotate MiniCluster tests in flink-tests with AlsoRunWithSchedulerNG Sub-task Resolved Zhu Zhu Actions
        46.
        Enable ClassLoaderITCase to pass with scheduler NG Sub-task Closed Zhu Zhu Actions
        47.
        Enable KeyedStateCheckpointingITCase to pass with scheduler NG Sub-task Closed Gary Yao Actions
        48.
        Enable ZooKeeperHighAvailabilityITCase to pass with scheduler NG Sub-task Closed Zhu Zhu Actions
        49.
        Enable RegionFailoverITCase to pass with scheduler NG Sub-task Closed Gary Yao Actions
        50.
        Avoid to notify ineffective state updates to scheduler Sub-task Closed Zhu Zhu Actions
        51.
        Restore task state in new DefaultScheduler Sub-task Closed Zhu Zhu Actions
        52.
        RestartPipelinedRegionStrategy leverage tracked partition availability for better failover experience in DefaultScheduler Sub-task Closed Zhu Zhu Actions
        53.
        Enable BatchFineGrainedRecoveryITCase to pass with scheduler NG Sub-task Closed Zhu Zhu Actions
        54.
        Refactor SchedulingTopology to extend base topology Sub-task Resolved Zhu Zhu Actions
        55.
        Refactor FailoverTopology to extend base topology Sub-task Resolved Zhu Zhu Actions
        56.
        Keep only one execution topology in scheduler Sub-task Resolved Zhu Zhu Actions
        57.
        Enable AbstractTaskManagerProcessFailureRecoveryTest to pass with new DefaultScheduler Sub-task Closed Zhu Zhu   Actions
        58.
        Support building pipelined regions from base topology Sub-task Resolved Zhu Zhu Actions
        59.
        Add a metric to show failover count regarding fine grained recovery Sub-task Closed Zhu Zhu Actions
        60.
        Simplify params of Execution#processFail Sub-task Closed Unassigned   Actions
        61.
        Enable YARNHighAvailabilityITCase to pass with new DefaultScheduler Sub-task Closed Gary Yao   Actions
        62.
        Handle schedule mode LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST correctly in DefaultScheduler Sub-task Closed Gary Yao Actions
        63.
        Enable KafkaConsumerTestBase#runFailOnNoBrokerTest to pass with new DefaultScheduler Sub-task Closed Gary Yao Actions
        64.
        Introduce full restarts failover strategy for NG scheduler Sub-task Closed Zhu Zhu Actions
        65.
        Avoid leaking instance of DefaultScheduler before object is constructed Sub-task Closed Gary Yao Actions
        66.
        Enable 'Queryable state (rocksdb) with TM restart' E2E test to pass with new DefaultScheduler Sub-task Closed Gary Yao Actions
        67.
        Enable 'Streaming File Sink end-to-end test' to pass with new DefaultScheduler Sub-task Closed Gary Yao Actions
        68.
        Enable 'Streaming bucketing end-to-end test' to pass with new DefaultScheduler Sub-task Closed Gary Yao Actions
        69.
        Avoid leaking unassigned Slot in DefaultScheduler when Deployment is outdated Sub-task Closed Gary Yao Actions
        70.
        Let tasks in a batch get scheduled in topological order and subtaskIndex ascending pattern Sub-task Closed Zhu Zhu Actions
        71.
        Set default value of config option jobmanager.scheduler to "ng" Sub-task Closed Gary Yao Actions
        72.
        SchedulerBase should only log the RestartStrategy in legacy scheduling mode Sub-task Closed Gary Yao Actions

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            srichter Stefan Richter
            Votes:
            3 Vote for this issue
            Watchers:
            48 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 23h 10m
                23h 10m

                Slack

                  Issue deployment