Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Not A Problem
-
3.1.0
-
None
-
None
Description
1. aggregateByKey, reduceByKey and foldByKey will always perform mapSideCombine;
However, this can be skiped sometime, specially in ML (RobustScaler):
vectors.mapPartitions { iter => if (iter.hasNext) { val summaries = Array.fill(numFeatures)( new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError)) while (iter.hasNext) { val vec = iter.next vec.foreach { (i, v) => if (!v.isNaN) summaries(i) = summaries(i).insert(v) } } Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress)) } else Iterator.empty }.reduceByKey { case (s1, s2) => s1.merge(s2) }
This reduceByKey in RobustScaler does not need mapSideCombine at all, similar places exist in KMeans, GMM, etc;
To my knowledge, we do not need mapSideCombine if the reduction factor isn't high;
2. treeAggregate and treeReduce are based on foldByKey, the mapSideCombine in the first call of foldByKey can also be avoided.
Map side combine in group by key case does not reduce the amount of data shuffled. Instead, it forces a lot more objects to go into old gen, and leads to worse GC.
So what about:
1. exposing mapSideCombine in aggByKey/reduceByKey/foldByKey, so that user can disable unnecessary mapSideCombine
2. disabling the mapSideCombine in the first call of foldByKey in treeAggregate and treeReduce
3. disabling the unnecessary mapSideCombine in ML;
Friendly ping srowen huaxingao weichenxu123 hyukjin.kwon viirya
Attachments
Issue Links
- relates to
-
SPARK-772 groupByKey should disable map side combine
- Resolved