Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10653

Introduce Pluggable Shuffle Service Architecture

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.9.0
    • Component/s: Runtime / Network
    • Labels:
      None

      Description

      This is the umbrella issue for improving shuffle architecture.

      Shuffle is the process of data transfer between stages, which involves in writing outputs on sender side and reading data on receiver side. In flink implementation, it covers three parts of writer, transport layer and reader separately which are uniformed for both streaming and batch jobs.

      In detail, the current ResultPartitionWriter interface on upstream side only supports in-memory outputs for streaming job and local persistent file outputs for batch job. If we extend to implement another writer such as DfsWriter, RdmaWriter, SortMergeWriter, etc based on ResultPartitionWriter interface, it has not theĀ unified mechanism to extend the reader side accordingly.

      In order to make the shuffle architecture more flexible and support more scenarios especially for batch jobs, a high level shuffle architecture is necessary to manage and extend both writer and reader sides together.

      Refer to the design doc for more details.

        Attachments

        Issue Links

        1.
        Introduce ResultPartitionWithConsumableNotifier in task for notifying consumable result partition Sub-task Resolved Zhijiang Actions
        2.
        Remove the schedule mode property from RPDD to TDD Sub-task Closed Zhijiang Actions
        3.
        Introduce ShuffleMaster in Job Master Sub-task Resolved Andrey Zagrebin Actions
        4.
        Introduce ShuffleEnvironment in Task Executor Sub-task Resolved Andrey Zagrebin Actions
        5.
        Activate default shuffle implementation and remove legacy code Sub-task Closed Zhijiang   Actions
        6.
        Extend the necessary methods in ResultPartitionWriter interface Sub-task Closed Zhijiang Actions
        7.
        Make ResultPartitionWriter extend AutoCloseable Sub-task Closed Zhijiang Actions
        8.
        Make InputGate interface extend AutoCloseable Sub-task Closed Zhijiang Actions
        9.
        Remove IOMode from NetworkEnvironment Sub-task Closed Zhijiang Actions
        10.
        Remove KvState related components from NetworkEnvironment Sub-task Closed Zhijiang Actions
        11.
        Refactor the creation of ResultPartition and InputGate into NetworkEnvironment Sub-task Closed Zhijiang Actions
        12.
        Replace IntermediateResultPartitionID with ResultPartitionID in ResultPartitionDeploymentDescriptor Sub-task Closed Zhijiang Actions
        13.
        Refactor to simplify the process of scheduleOrUpdateConsumers Sub-task Resolved Zhijiang Actions
        14.
        Refactor the constructor of NetworkEnvironment Sub-task Closed Zhijiang Actions
        15.
        Abstract TaskEventPublisher interface for simplifying NetworkEnvironment Sub-task Closed Zhijiang Actions
        16.
        Move network related options to NetworkEnvironmentOptions Sub-task Closed Zhijiang Actions
        17.
        Remove legacy fields for SingleInputGate Sub-task Closed Zhijiang Actions
        18.
        Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService Sub-task Closed Zhijiang Actions
        19.
        Introduce partition/gate setup to decouple task registration with NetworkEnvironment Sub-task Closed Andrey Zagrebin Actions
        20.
        Refactor IOMetrics to not distinguish between local/remote in/out bytes Sub-task Closed Zhijiang Actions
        21.
        Introduce InputGateWithMetrics in Task to increment numBytesIn metric Sub-task Closed Zhijiang Actions
        22.
        Consider introducing batch metric register in NetworkEnviroment Sub-task Closed Zhijiang   Actions
        23.
        Refactor ResultPartitionManager to break tie with Task Sub-task Resolved Andrey Zagrebin Actions
        24.
        Move network metrics setup into NetworkEnvironment Sub-task Closed Zhijiang Actions
        25.
        Refactor the start method of ConnectionManager Sub-task Closed Zhijiang Actions
        26.
        Move Task.inputGatesById to NetworkEnvironment Sub-task Closed Andrey Zagrebin Actions
        27.
        Introduce an encapsulated metric group layout for shuffle API and deprecate old one Sub-task Closed Andrey Zagrebin Actions
        28.
        Remove getBufferProvider method from ResultPartitionWriter interface Sub-task Closed Zhijiang Actions
        29.
        Switch Task from ResultPartition to ResultPartitionWriter interface Sub-task Closed Andrey Zagrebin Actions
        30.
        Make NetworkEnvironment#start() return the binded data port Sub-task Closed Zhijiang Actions
        31.
        Remove getOwningTaskName method from InputGate Sub-task Resolved Zhijiang Actions
        32.
        Refactor abstract InputGate to general interface Sub-task Closed Zhijiang Actions
        33.
        Introduce NetworkEnvironment.getUnreleasedPartitions instead of using getResultPartitionManager Sub-task Closed Andrey Zagrebin Actions
        34.
        Introduce ShuffleService interface and its configuration Sub-task Resolved Andrey Zagrebin Actions
        35.
        Make shuffle environment implementation independent with IOManager Sub-task Closed Zhijiang Actions
        36.
        Remove abstract getPageSize method from InputGate Sub-task Resolved Zhijiang Actions
        37.
        Remove ExecutionAttemptID argument from ResultPartitionFactory#create Sub-task Closed Zhijiang Actions
        38.
        Add partition lifecycle related Shuffle API Sub-task Closed Andrey Zagrebin Actions
        39.
        Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes Sub-task Closed Andrey Zagrebin Actions
        40.
        Refactor the process of SchedulerNG#requestPartitionState Sub-task Closed Zhijiang   Actions
        41.
        Remove getBufferSize method from BufferPoolFactory Sub-task Closed Zhijiang Actions
        42.
        Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type Sub-task Closed Andrey Zagrebin Actions

          Activity

          $i18n.getText('security.level.explanation', $currentSelection) Viewable by All Users
          Cancel

            People

            • Assignee:
              zjwang Zhijiang
              Reporter:
              zjwang Zhijiang

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 13h 50m
                13h 50m

                  Issue deployment