Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.6.0, 1.6.1, 2.0.0
-
None
Description
when a shuffle block size is huge, say a large array (array size more than 128MB), there will be performance issue for getting remote blocks. This is because network frame size is large, and when we are using a composite buffer, which will consolidate when the components number reaches maximum components number (default is 16) in netty underlying, performance issue will occurs. There will be too many memory copies inside netty's compositeBuffer.
How to reproduce:
sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).length
In this case, the serialized result size of each task is about 400MB, the result will be transferred to driver as indirectResult. We can see after the data transferred to driver, on driver side there will still need a lot of time to process and the 3 CPUs (in this case, parallelism is 3) are fully utilized with system call very high. And this processing time is calculated as result getting time on webUI.
Such cases are very common in ML applications, which will return a large array from each executor.