Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10653

Introduce Pluggable Shuffle Manager Architecture

    Details

    • Type: New Feature
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Runtime / Network
    • Labels:
      None

      Description

      This is the umbrella issue for improving shuffle architecture.

      Shuffle is the process of data transfer between stages, which involves in writing outputs on sender side and reading data on receiver side. In flink implementation, it covers three parts of writer, transport layer and reader separately which are uniformed for both streaming and batch jobs.

      In detail, the current ResultPartitionWriter interface on upstream side only supports in-memory outputs for streaming job and local persistent file outputs for batch job. If we extend to implement another writer such as DfsWriter, RdmaWriter, SortMergeWriter, etc based on ResultPartitionWriter interface, it has not theĀ unified mechanism to extend the reader side accordingly.

      In order to make the shuffle architecture more flexible and support more scenarios especially for batch jobs, a high level shuffle architecture is necessary to manage and extend both writer and reader sides together.

      Refer to the design doc for more details.

        Attachments

          Issue Links

          1.
          Introduce ResultPartitionWithConsumableNotifier in task for notifying consumable result partition Sub-task Resolved zhijiang
          2.
          Remove the schedule mode property from RPDD to TDD Sub-task Closed zhijiang
          3.
          Introduce ShuffleMaster in Job Master Sub-task Resolved Andrey Zagrebin
          4.
          Introduce ShuffleEnvironment in Task Executor Sub-task Resolved Andrey Zagrebin
          5.
          Activate default shuffle implementation and remove legacy code Sub-task Closed zhijiang  
          6.
          Extend the necessary methods in ResultPartitionWriter interface Sub-task Closed zhijiang
          7.
          Make ResultPartitionWriter extend AutoCloseable Sub-task Closed zhijiang
          8.
          Make InputGate interface extend AutoCloseable Sub-task Closed zhijiang
          9.
          Remove IOMode from NetworkEnvironment Sub-task Closed zhijiang
          10.
          Remove KvState related components from NetworkEnvironment Sub-task Closed zhijiang
          11.
          Refactor the creation of ResultPartition and InputGate into NetworkEnvironment Sub-task Closed zhijiang
          12.
          Replace IntermediateResultPartitionID with ResultPartitionID in ResultPartitionDeploymentDescriptor Sub-task Closed zhijiang
          13.
          Refactor to simplify the process of scheduleOrUpdateConsumers Sub-task Resolved zhijiang
          14.
          Refactor the constructor of NetworkEnvironment Sub-task Closed zhijiang
          15.
          Abstract TaskEventPublisher interface for simplifying NetworkEnvironment Sub-task Closed zhijiang
          16.
          Introduce API point for ShuffleEnvironment to get estimation of locally required memory Sub-task Open zhijiang  
          17.
          Move network related options to NetworkEnvironmentOptions Sub-task Closed zhijiang
          18.
          Remove legacy fields for SingleInputGate Sub-task Closed zhijiang
          19.
          Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService Sub-task Closed zhijiang
          20.
          Introduce partition/gate setup to decouple task registration with NetworkEnvironment Sub-task Closed Andrey Zagrebin
          21.
          Refactor IOMetrics to not distinguish between local/remote in/out bytes Sub-task Closed zhijiang
          22.
          Introduce InputGateWithMetrics in Task to increment numBytesIn metric Sub-task Closed zhijiang
          23.
          Consider introducing batch metric register in NetworkEnviroment Sub-task Closed zhijiang  
          24.
          Refactor ResultPartitionManager to break tie with Task Sub-task Resolved Andrey Zagrebin
          25.
          Move network metrics setup into NetworkEnvironment Sub-task Closed zhijiang
          26.
          Refactor the start method of ConnectionManager Sub-task Closed zhijiang
          27.
          Move Task.inputGatesById to NetworkEnvironment Sub-task Closed Andrey Zagrebin
          28.
          Consider naming convention for config options of shuffle services Sub-task Open Unassigned  
          29.
          Introduce an encapsulated metric group layout for shuffle API and deprecate old one Sub-task Closed Andrey Zagrebin
          30.
          Remove getBufferProvider method from ResultPartitionWriter interface Sub-task Closed zhijiang
          31.
          Switch Task from ResultPartition to ResultPartitionWriter interface Sub-task Closed Andrey Zagrebin
          32.
          Make NetworkEnvironment#start() return the binded data port Sub-task Closed zhijiang
          33.
          Remove getOwningTaskName method from InputGate Sub-task Resolved zhijiang
          34.
          Refactor abstract InputGate to general interface Sub-task Closed zhijiang
          35.
          Introduce NetworkEnvironment.getUnreleasedPartitions instead of using getResultPartitionManager Sub-task Closed Andrey Zagrebin
          36.
          Introduce ShuffleService interface and its configuration Sub-task Resolved Andrey Zagrebin
          37.
          Load shuffle service implementations from plugin manager Sub-task Open Unassigned  
          38.
          Make shuffle environment implementation independent with IOManager Sub-task Closed zhijiang
          39.
          Remove abstract getPageSize method from InputGate Sub-task Resolved zhijiang
          40.
          Create a separate maven module for Shuffle API Sub-task Open Unassigned  
          41.
          Remove ExecutionAttemptID argument from ResultPartitionFactory#create Sub-task Closed zhijiang
          42.
          Add partition lifecycle related Shuffle API Sub-task Closed Andrey Zagrebin
          43.
          Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes Sub-task Closed Andrey Zagrebin
          44.
          Refactor the process of SchedulerNG#requestPartitionState Sub-task Open zhijiang  
          45.
          Remove getBufferSize method from BufferPoolFactory Sub-task Closed zhijiang
          46.
          Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type Sub-task Closed Andrey Zagrebin
          47.
          Refactor ShuffleMaster to optionally provide preferred TM location for produced partitions Sub-task Open Unassigned  

            Activity

              People

              • Assignee:
                zjwang zhijiang
                Reporter:
                zjwang zhijiang
              • Votes:
                4 Vote for this issue
                Watchers:
                19 Start watching this issue

                Dates

                • Created:
                  Updated:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 13.5h
                  13.5h