Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27405

Refactor SourceCoordinator to abstract BaseCoordinator implementation

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Runtime / Coordination
    • None

    Description

      To solve small files issue caused by data skewness, Flink Iceberg data shuffling was proposed(design doc https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit#). The basic idea is to use statistics operator to collect local statistics for traffic distribution at taskmanagers (workers). Local statistics are periodically sent to the statistics coordinator (running in jobmanager). Once globally aggregated statistics are ready, the statistics coordinator broadcasts them to all operator instances. And then a customized partitioner uses the global statistics which is passed down from statistics operator to distribute data to Iceberg writers.

      In the process of Flink Iceberg data shuffling implementation, we found that, StatisticsCoordinator can share function with SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar function as SourceCoordinatorConext#callInCoordinatorThread and the StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want to refactor the source coordinator classes to abstract a general coordinator implementation to reduce the duplicated code when adding other coordinators. 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            gang ye gang ye
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated: