Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
None
-
None
Description
The way groupByKey is written actually pulls the entire PairRDDFunctions into the 3 closures, sometimes resulting in gigantic task sizes:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) bufs.mapValues(_.toIterable) }
Changing the functions from def to val would solve it.