Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Done
-
None
Description
This is the part 2 of the optimization related to task deployments. For more details about the overall description and the part 1, please see FLINK-23005.
For ShuffleDescriptors of vertices with 8k parallelism, the size of their serialized value is more than 700 Kilobytes. After the compression, it would be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would become a heavy burden for the garbage collector to deal with.
In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed via the blob server if their sizes exceed a certain threshold (which is defined as blob.offload.minsize). TaskExecutors request the information from the blob server once they begin to process the TaskDeploymentDescriptor. This make sure that JobManager don't need to keep all the copies in the heap memory until the TaskDeploymentDescriptors are all sent. There will be only one copy in the blob server. Like the JobInformation, we can just distribute the cached ShuffleDescriptors via the blob server if their overall size has exceeded the threshold.
This improvement can help to avoid the long-term garbage collection during task deployment.
The cached ShuffleDescriptors in the blob server will be removed once the partitions related to them are no longer valid. This makes sure the blob server won't be full of cached ShuffleDescriptors, even there's a long running session on the cluster.
In the part 3 we will limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen on the TaskExecutor because of cached ShuffleDescriptors. For more details please see FLINK-23354.
Attachments
Issue Links
- links to