Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.18.0, 1.17.1
Description
The background is similar to FLINK-33315.
A hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. When slotPerTM = 4, one TM will run 4 HiveSources at the same time.
How the TaskExecutor to submit a large task?
- TaskExecutor#loadBigData will read all bytes from file to SerializedValue<TaskInformation>
- The SerializedValue<TaskInformation> has a byte[]
- It will cost the heap memory
- It will be great than 281 MB, because it not only stores HiveSource#partitionBytes, it also stores other information of TaskInformation.
- Generate the TaskInformation from SerializedValue<TaskInformation>
- TaskExecutor#submitTask calls the tdd.getSerializedTaskInformation()..deserializeValue()
- tdd.getSerializedTaskInformation() is SerializedValue<TaskInformation>
- It will generate the TaskInformation
- TaskInformation includes the Configuration taskConfiguration
- The taskConfiguration includes StreamConfig#SERIALIZEDUDF
Based on the above process, TM memory will have 2 big byte array for each task:
- The SerializedValue<TaskInformation>
- The TaskInformation
When one TM runs 4 HiveSources at the same time, it will have 8 big byte array.
In our production environment, this is also a situation that often leads to TM OOM.
Solution:
These data is totally same due to the PermanentBlobKey is same. We can add a cache for it to reduce the memory and cpu cost.
Attachments
Issue Links
- links to