Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
None
-
None
-
None
-
None
Description
See some background discussion here: http://stackoverflow.com/questions/34690682/spark-mlib-fpgrowth-job-fails-with-memory-error/
The FPGrowth mode's run() method seems to do the following:
- Count items
- Generate frequent items
- Generate frequent item sets
The model is trained based on the outcome of the above. When generating frequent items, the code does the following:
data.flatMap { t =>
val uniq = t.toSet
if (t.size != uniq.size) {
throw new SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.")
}
t
}.map(v => (v, 1L))
.reduceByKey(partitioner, _ + _)
.filter(_._2 >= minCount)
.collect()
.sortBy(-_._2)
.map(_._1)
The collect() call in the snippet above is causing my executors to blow past any amount of memory I can give them. Is there a way to write genFreqItems() and genFreqItemsets() so they won't try to collect all frequent items in memory?