Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21110 Optimize scheduler performance for large-scale jobs
  3. FLINK-23005

Cache the compressed serialized value of ShuffleDescriptors

    XMLWordPrintableJSON

Details

    Description

      Introduction

      The optimizations introduced in FLINK-21110 so far have improved the performance of job initialization, failover and partitions releasing. However, the task deployment is still slow. For a job with two vertices, each vertex has 8k parallelism and they are connected with the all-to-all edge. It takes 32.611s to deploy all the tasks and make them transition to running. If the parallelisms are 16k, it may take more than 2 minutes.

      As the creation of TaskDeploymentDescriptors runs in the main thread of jobmanager, it means that the jobmanager cannot deal with other akka messages like heartbeats, task status update, and etc., for more than two minutes.

       

      All in all, currently there are two issues in the deployment of tasks for large scale jobs:

      1. It takes a long time to deploy tasks, especially for all-to-all edges.
      2. Heartbeat timeout may happen during or after the procedure of task deployments. For the streaming job, it would cause the failover of the entire region. The job may never transition to running since there would be another heartbeat timeout during the procedure of new task deployments.

      Proposal

      Task deployment involves the following procedures:

      1. Jobmanager creates TaskDeploymentDescriptor for each task in the main thread
      2. TaskDeploymentDescriptor is serialized in the future executor
      3. Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
      4. TaskExecutors create a new task thread and execute it

       

      The optimization contains three parts:

      1. Cache the compressed serialized value of ShuffleDescriptors

      ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the IntermediateResultPartitions that a task consumes. For the downstream vertices connected with the all-to-all edge that has N parallelism, we need to calculate N ShuffleDescriptors for N times. However, for these vertices, they share the same ShuffleDescriptors since they all consume the same IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for each downstream vertex individually. We can just cache them. This will decrease the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to O(N).

      Furthermore, we don't need to serialize the same ShuffleDescriptors for N times, so we can just cache the serialized value of ShuffleDescriptors instead of the original object. To decrease the size of akka messages and reduce the transmission of replicated data over the network, these serialized value can be compressed.

      2. Distribute the ShuffleDescriptors via blob server (see FLINK-23218)

      3. Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor (see FLINK-23354)

      Summary

      In summary, the optimization of task deployment is to introduce a cache for the TaskDeploymentDescriptor. We cache the compressed serialized value of ShuffleDescriptors. If the size of the value exceeds a certain threshold, the value would be distributed via the blob server.

      Comparison

      We implemented a POC and conducted an experiment to compare the performance of our optimization. We choose the streaming job in the experiment because no task will be running until all tasks are deployed. This avoids other disturbing factors. The job contains two vertices: a source and a sink. They are connected with an all-to-all edge.

      The results illustrated below are the time interval between the timestamp of the first task that transitions to deploying and the timestamp of the last task that transitions to running:

      Parallelism Before After 
      8000*8000 32.611s 6.480s
      16000*16000 128.408s 19.051s

      Attachments

        Issue Links

          Activity

            People

              Thesharing Zhilong Hong
              Thesharing Zhilong Hong
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: